Flink
什么是flink
flink是一个分布式 ,高性能 ,随时可用的以及准确的流处理计算框架 ,
flink可以对无界数据(流处理)和有界数据(批处理)进行有状态计算(flink天生支持状态计算)的分布式 ,高性能的计算框架。
flink的基石
flink的四大基石:checkpoint,state,time,window
- checkpoint:基于chandy-lamport算法实现分布式计算任务的一致性语义;
- state:flink中的状态机制 ,flink天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;
- time:flink中支持基于事件时间和处理时间进行计算 ,spark streaming只能按照process time进行处理;基于事件时间的计算我们可以解决数据迟到和乱序等问题。
- window:flink提供了更多丰富的window,基于时间 ,基于数量 ,session window,同样支持滚动和滑动窗口的计算。
flink流处理和批处理
- 流处理:无界 ,实时性有要求 ,只需对经过程序的每条数据进行处理
- 批处理:有界 ,持久 ,需要对全部数据进行访问处理;
spark vs flink
- spark:spark生态中是把所有的计算都当做批处理 ,spark streaming中流处理本质上也是批处理(micro batch);
- flink:flink中是把批处理(有界数据集的处理)看成是一个特殊的流处理场景;flink中所有计算都是流式计算;
flink中重要角色
- JobManager:类似spark中master ,负责资源申请 ,任务分发 ,任务调度执行 ,checkpoint的协调执行;可以搭建HA ,双master。
- TaskManager:类似spark中的worker ,负责任务的执行 ,基于dataflow(spark中DAG)划分出的task;与jobmanager保持心跳 ,汇报任务状态。
无界数据和有界数据
- 无界数据流:数据流是有一个开始但是没有结束;
- 有界数据流:数据流是有一个明确的开始和结束 ,数据流是有边界的。
flink处理流批处理的思想是:
flink支持的runtime(core 分布式流计算)支持的是无界数据流 ,但是对flink来说可以支持批处理 ,只是从数据流上来说把有界数据流只是无界数据流的一个特例 ,无界数据流只要添加上边界就是有界数据流。
flink编程模型
flink提供了四种编程模型:
- SQL:基于Table API ,提供了流处理和批处理两种编程模型 ,提供了SQL接口;
- Table API:基于DataSet API ,提供了流处理和批处理两种编程模型 ,提供了SQL接口;
- DataStream API:最底层编程模型 ,基于dataflow模型 ,提供了流处理和批处理两种编程模型;DataSet API:基于dataflow模型 ,只支持批处理;
- Stateful Stream Processing:基于dataflow模型 ,支持有状态计算。
Libraries支持
- Flink ML:基于DataStream API ,提供了机器学习算法库;
- Gelly:基于DataSet API ,提供了图计算库;
- CEP:基于DataStream API ,提供了复杂事件处理库;
- Table:基于Table API ,提供了流处理和批处理两种编程模型 ,提供了SQL接口。
flink的安装模式
- local:单机模式 ,尽量不使用
- standalone:flink自带集群 ,资源管理由flink集群管理
- flink on yarn: 把资源管理交给yarn实现。
Flink on Yarn
Per-Job 模式:
每次提交作业时,Flink 都会为该作业启动一个新的独立集群,等到作业运行结束后再关闭集群。这种模式确保了隔离性,每个作业都在自己的 Flink 集群上运行,不会和其它作业相互干扰。所以 Per-Job 模式特别适合资源敏感和需要隔离的长时间运行的作业,但如果有很多短时间的作业需要运行,那么频繁地启动和停止集群将会导致大量的资源浪费和时间开销。Application 模式:
类似于 Per-Job 模式,因为在这种模式下,Flink 集群的生命周期也是与单个应用程序的生命周期绑定的。不过 Application 模式通常更适合云环境,以及使用 YARN、Kubernetes 做资源管理,其中集群的启动和应用程序的提交可以被打包成一个单一的步骤。这种模式同样适合资源敏感和需要隔离的作业,并且还包含所有依赖的时候。Session 模式下:
会先启动一个 Flink 集群,然后等待作业提交。这种模式适合于多个作业需要反复或同时运行在同一个 Flink 集群上的场景,因为它可以避免每次作业运行时都重复启动和停止集群的开销。
大佬详细介绍博客 https://www.cnblogs.com/wan-ming-zhu/p/18050046
Local部署
官方文档
https://flink.apache.org/downloads/
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/try-flink/local_installation/
# kube-41 Local部署
wget -O /usr/local/jdk-11.0.18_linux-x64_bin.tar.gz https://home.vimll.com:9999/download/jdk/jdk-11.0.18_linux-x64_bin.tar.gz
tar xf /usr/local/jdk-11.0.18_linux-x64_bin.tar.gz
wget -O /usr/local/flink-1.18.1-bin-scala_2.12.tgz https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
tar -xzf /usr/local/flink-1.18.1-bin-scala_2.12.tgz
cd flink-1.18.1/
# 指定运行JAVA环境
vim conf/flink-conf.yaml
python.client.executable: /usr/bin/python3
env.java.home: /usr/local/jdk-11.0.18
containerized.master.env.JAVA_HOME: /usr/local/jdk-11.0.18
containerized.taskmanager.env.JAVA_HOME: /usr/local/jdk-11.0.18
# jobManager 的IP地址
jobmanager.rpc.address: 172.16.100.41
# JobManager 的端口号
jobmanager.rpc.port: 6123
# JobManager 监控IP
jobmanager.bind-host: 0.0.0.0
# JobManager JVM heap 内存大小
jobmanager.memory.process.size: 10240m
# TaskManager 监控IP
taskmanager.bind-host: 0.0.0.0
# 配置为本机task节点ip地址,如测试环境有多个ip,则配置为集群中可以相互通信的ip
taskmanager.host: 172.16.100.41
# TaskManager JVM heap 内存大小
taskmanager.memory.process.size: 20480m
# 每个 TaskManager 提供的任务 slots 数量大小 ,默认为1
taskmanager.numberOfTaskSlots: 8
# 程序默认并行计算的个数 ,默认为1
parallelism.default: 8
rest.port: 8081
rest.address: 0.0.0.0
# web服务绑定的address
rest.bind-address: 0.0.0.0
vim /etc/profile
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_HOME=/usr/local/flink-1.18.1
export PATH=$FLINK_HOME/bin:$PATH
source /etc/profile
vim bin/pyflink-shell.sh
PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python3"}"
yum install -y xz-devel mesa-libGL python-backports-lzma
cd /usr/local/python-3.8.20
./configure --prefix=/usr/local/python-3.8.20
make
sudo make altinstall
/usr/local/python-3.8.20/bin/python3.8 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
# 安装依赖
pip3 install apache-flink==1.18.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip3 install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple python3.3版本以下 ,以上版本自带lzma ,源码安装依赖xz-devel,无则不安装lzma
# 启动shell交互式窗口
./bin/pyflink-shell.sh local
# 启动集群
./bin/start-cluster.sh
# 提交作业测试
./bin/flink run examples/batch/WordCount.jar --input /root/words.txt --output /root/out2
# 访问web UI
http://localhost:8081
# 停止集群
./bin/stop-cluster.sh
standalone模式
### 基于local配置增加以下配置
vim workers
172.16.100.41
172.16.100.42
172.16.100.43
vi masters
172.16.100.41:8081
# 将以上配置向集群其他节点同步 ,并修改
# 仅需要修改flink-conf.yaml中的taskmanager.host地址即可 ,修改为当前节点的IP地址
# 启动集群
/usr/local/flink-1.18.1/bin/start-cluster.sh
# 提交任务
/usr/local/flink-1.18.1/bin/flink run /usr/local/flink-1.18.1/examples/batch/WordCount.jar
--input hdfs://kube-40:8020/wordcount/input/words.txt --output hdfs://kube-40:8020/wordcount/output/result.txt --parallelism 2
standalone HA集群搭建
# 修改conf/flink-conf.yaml
#开启HA ,使用文件系统作为快照存储
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://kube-40:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
high-availability.storageDir: hdfs://kube-40:8020/flink/ha/
high-availability.zookeeper.quorum: kube-40:2181,kube-41:2181,kube-42:2181
# 默认是 open ,如果 zookeeper security 启用了更改成 creator
high-availability.zookeeper.client.acl: open
# 设置savepoints 的默认目标目录(可选)
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# 用于启用/禁用增量 checkpoints 的标志
# state.backend.incremental: false
vi masters
172.16.100.41:8081
172.16.100.42:8081
# 分发master,flink-conf.yaml
# 在kube-42节点上 ,修改flink-conf.yaml中jobmanager.rpc.address: 172.16.100.42
# 启动HA集群
./bin/start-cluster.sh
Flink TaskManager 内存调优
JVM Heap
JVM Heap中分为两大部分,一个是Flink 框架所需要使用的堆内存,还有一个是每个Task运行所需的对内存。
我们可以看到,当前Flink TaskManager框架自身配置的内存是128MB。而Task Heap配置的内存是1.42GB。当前Task Manager没有运行任何的Job,一共只占用了62MB的内存。
Framework Heap
Framework heap其实是为Task Executor本身所配置的堆内存大小,因为Task Executor本身也是一个Java。Framework Heap是Flink框架保留的,是不会用来执行Task的。该堆的大小由taskmanager.memory.framework.heap.size参数控制,它的运行所需资源比较轻量级,默认为128M。
Task Heap
Task Heap Memory是专门用于执行Flink任务的堆内存空间。该堆的大小由taskmanager.memory.task.heap.size参数指定。这个参数的默认为:Total Flink Memory – Framework Heap – Task off-heap memory – Managed Memory – Network Memory。
Managed Memory Off-Heap Memory
Memory是由Flink直接管理的off-heap内存,它主要用于排序、哈希表、中间结果缓存、RocksDB的backend。其实它是Task Executor管理的off-heap内存。它可以由taskmanager.memory.managed.size 参数直接配置指定,默认是不配置的。默认是通过
taskmanager.memory.managed.fraction配置的因子(默认0.4)来设置Managed off-heap memory,默认为Total Flink Memory的40%。
Direct Memory
Framework Off-heap Memory
Task Executor保留的off-heap memory,不会分配给任何slot。可以通过taskmanager.memory.framework.off-heap.size参数指定,默认为128M。
Framework所保留的内存,一般是不建议调整的。
Task Off-heap Memory
Task Executor执行的Task所使用的堆外内存。如果在Flink应用的代码中调用了Native的方法,需要用到off-heap内存,这些内存会分配到Off-heap堆外内存中。可以通过指定taskmanager.memory.task.off-heap.size来配置,默认为0。如果代码中需要调用Native Method并分配堆外内存,可以指定该参数。一般不使用,所以大多数时候可以保持0。
Network Memory
Network Memory使用的是Directory memory,在Task与Task之间进行数据交换时(shuffle),需要将数据缓存下来,缓存能够使用的内存大小就是这个Network Memory。它由是三个参数决定:
taskmanager.memory.network.min:默认为64MB
taskmanager.memory.network.max:默认为1gb
taskmanager.memory.network.fraction:默认为0.1
Network Memory有两种配置方式,一种是通过taskmanager.memory.network.fraction参数,也就是Total Flink Memory的百分比,默认为Total Flink Meory的10%。还有一种是通过taskmanager.memory.network.min和taskmanager.memory.network.max指定shuffle缓存在min-max之间的内存空间。如果使用fraction计算出来的Network Meory超出min-max的范围,那么以min-max为准。如果配置的min和max是一样的值,就使用固定的内存大小。
我们来算一下:当前的Total Flink Meory为:3425MB,而network.fraction为0.1,min为64MB,max为1GB,所以好Network Meory在min-max之间,为343MB。
JVM Metaspace Memory
从JDK 8开始,JVM把永久代拿掉了。类的一些元数据放在叫做Metaspace的Native Memory中。在Flink中的JVM Metaspace Memory也一样,它配置的是Task Manager JVM的元空间内存大小。通过taskmanager.memory.jvm-metaspace.size参数配置,默认为256MB。
JVM Overhead
保留给JVM其他的内存开销。例如:Thread Stack、code cache、GC回收空间等等。和Network Memory的配置方法类似。它也由三个配置决定:
taskmanager.memory.jvm-overhead.min:默认为192MB,
taskmanager.memory.jvm-overhead.max:默认为1GB
taskmanager.memory.jvm-overhead.fraction:默认为0.1。
1. 什么是托管内存(Managed Memory)
托管内存由 Flink 管理,并作为本机内存(堆外)进行分配。以下工作负载使用托管内存:
流作业可以将其用于 RocksDB 状态后端。
流式处理和批处理作业都可以使用它进行排序、哈希表、中间结果的缓存。
流处理和批处理作业都可以使用它在 Python 进程中执行用户定义的函数。
2. 框架堆外内存和任务堆外内存的区别是什么?
任务堆外内存为由用户代码分配的堆外内存, 给Flink框架分配的堆外内存为框架堆外内存
3. JobManager的JVM Heap用途和配置依据是什么?
JVM Heap的用途:
Flink框架
在作业提交期间(例如,对于某些批处理源)或在Checkpoint完成回调中执行的用户代码
JVM 堆所需的大小主要由正在运行的作业数量、作业结构以及上述用户代码的要求决定。
4. JobManager的堆外内存用途是什么?
JobManager的堆外内存用途:
Flink 框架网络通信等使用直接内存的地方
在作业提交期间(例如,对于某些批处理源)或在检查点完成回调中执行的用户代码
5. 如果出现OutOfMemoryError: Java heap space怎么办?
该异常通常表明 JVM Heap 太小。您可以尝试通过增加总内存来增加 JVM 堆大小。您还可以直接增加 TaskManager 的任务堆内存或 JobManager 的 JVM 堆内存。
您也增加 TaskManager 的框架堆内存,但只有在确定 Flink 框架本身需要更多内存时才应更改此选项。
6. 如果出现OutOfMemoryError: Direct buffer memory怎么办?
该异常通常表明JVM直接内存限制太小或者存在直接内存泄漏。检查用户代码或其他外部依赖项是否使用 JVM 直接内存以及是否已正确说明。您可以尝试通过调整直接堆外内存来增加其限制。
7. 如果出现OutOfMemoryError: Metaspace怎么办?
该异常通常表明JVM元空间限制配置得太小。您可以尝试增加TaskManagers或JobManagers的JVM元空间选项。
8. 如果出现IOException: Insufficient number of network buffers怎么办?
这仅与TaskManagers有关,该异常通常表明配置的网络内存大小不够大。您可以尝试通过调整网络参数来增加网络内存。
9. 如果Yarn或者Kubernetes出现Container Memory Exceeded怎么办?
如果 Flink 容器尝试分配超出其请求大小(Yarn 或 Kubernetes)的内存,这通常表明 Flink 没有预留足够的本机内存。您可以通过使用外部监控系统或从容器被部署环境终止时的错误消息来观察这一点。
如果您在JobManager进程中遇到此问题,还可以通过设置 jobmanager.memory.enable-jvm-direct-memory-limit 选项来启用JVM Direct Memory限制,以排除可能的JVM Direct Memory泄漏。
如果使用RocksDBStateBackend:
并且内存控制被禁用:您可以尝试增加TaskManager的托管内存。
并且内存控制已启用,并且在保存点或完整检查点期间非堆内存增加:这可能是由于 glibc 内存分配器导致的(请参阅 glibc bug)。您可以尝试为TaskManagers添加环境变量 MALLOC_ARENA_MAX=1 。
或者,您可以增加 JVM 开销。
1)JVM 特定内存:JVM 本身使用的内存 ,包含 JVM 的 metaspace 和 over-head
1.JVM metaspace:JVM 元空间
taskmanager.memory.jvm-metaspace.size ,默认 256mb
2.JVM over-head 执行开销:JVM 执行时自身所需要的内存 ,包括线程堆栈、IO、编译缓存等所使用的内存。
taskmanager.memory.jvm-overhead.fraction ,默认 0.1
taskmanager.memory.jvm-overhead.min ,默认 192mb
taskmanager.memory.jvm-overhead.max ,默认 1gb
总进程内存*fraction ,如果小于配置的 min(或大于配置的 max)大小 ,则使用 min/max大小
2)框架内存:Flink 框架 ,即 TaskManager 本身所占用的内存 ,不计入 Slot 的资源中。
堆内:taskmanager.memory.framework.heap.size ,默认 128MB
堆外:taskmanager.memory.framework.off-heap.size ,默认 128MB
3)Task 内存:Task 执行用户代码时所使用的内存
堆内:taskmanager.memory.task.heap.size ,默认 none ,由 Flink 内存扣除掉其他部分的内存得到。
堆外:taskmanager.memory.task.off-heap.size ,默认 0 ,表示不使用堆外内存
4)网络内存:网络数据交换所使用的堆外内存大小 ,如网络数据交换缓冲区
堆外:taskmanager.memory.network.fraction ,默认 0.1
taskmanager.memory.network.min ,默认 64mb
taskmanager.memory.network.max ,默认 1gb
Flink 内存*fraction ,如果小于配置的 min(或大于配置的 max)大小 ,则使用 min/max大小
5)托管内存:用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果
堆外:taskmanager.memory.managed.fraction ,默认 0.4
taskmanager.memory.managed.size ,默认 none
如果 size 没指定 ,则等于 Flink 内存*fraction
案例分析
基于Yarn模式 ,参数指定总进程内存 ,taskmanager.memory.process.size ,比如指定为 4G ,每一块内存得到大小如下:
1)计算 Flink 内存
JVM 元空间 256m
JVM 执行开销: 4g*0.1=409.6m ,在[192m,1g]之间 ,最终结果 409.6m
Flink 内存=4g-256m-409.6m=3430.4m
网络内存=3430.4m*0.1=343.04m ,在[64m,1g]之间 ,最终结果 343.04m
托管内存【管理内存-RocksDB】=3430.4m*0.4=1372.16m
框架内存 ,堆内和堆外都是 128m
Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m
Flink 是实时流处理 ,关键在于资源情况能不能抗住高峰时期每秒的数据量 ,通常 TaskManager:2g~8g ,JobManager保持默认值即可 ,不断调整得出最优值。
2)合理利用CPU资源
Yarn 的容量调度器默认情况下是使用 "DefaultResourceCalculator" 分配策略 ,只根据内存调度资源 ,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1。
可以修改策略为 DominantResourceCalculator ,该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。
在 capacity-scheduler.xml 中修改属性:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
1.使用 DefaultResourceCalculator 策略
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c *.Demo \
*.jar
总计:4个Container ,4个Vcores
计算过程:JobManager:1个container ,1个vcores ,最大并行度为5->5个slot ,每个TaskManager有2个slot则需要3个TaskManager ,则3个container ,3个vcores。
2.使用 DominantResourceCalculator 策略
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c *.Demo \
*.jar
总计:4个Container ,7个Vcores
计算过程:JobManager:1个container ,1个vcores ,最大并行度为5->5个slot ,每个TaskManager有2个slot则需要3个TaskManager ,则3个container ,每个 container 2个 vcores 则需要 2*3 个vcores【默认单个container的 vcore数=单TM的slot数】。
3.使用 DominantResourceCalculator 策略并指定容器 vcore 数
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c *.Demo \
*.jar
总计:4个Container ,10个Vcores
计算过程:JobManager:1个container ,1个vcores ,最大并行度为5->5个slot ,每个TaskManager有2个slot则需要3个TaskManager ,则3个container ,3*3个vcores。
并行度设置
1)全局并行度计算
开发完成后 ,先进行压测。任务并行度给 10 以下 ,测试单个并行度的处理上限。然后【以峰值计算】 总 QPS/单并行度的处理能力 = 并行度。
先在 kafka 中积压数据 ,之后开启 Flink 任务 ,出现反压 ,就是处理瓶颈。
不能只从 QPS 去得出并行度 ,因为有些字段少、逻辑简单的任务 ,单并行度一秒处理几万条数据。而有些数据字段多 ,处理逻辑复杂 ,单并行度一秒只能处理 1000 条数据 ,最好根据高峰期的 QPS 压测 ,并行度*1.2 倍 ,富余一些资源。
2)Source 端并行度的配置
数据源是 Kafka Source 的并行度设置为 Kafka 对应 Topic 的分区数。如果已经等于 Kafka 的分区数 ,消费速度仍跟不上数据生产速度 ,考虑下 Kafka 要扩大分区 ,同时调大并行度等于分区数。
Flink 的一个并行度可以处理一至多个分区的数据 ,如果并行度多于 Kafka 的分区数 ,那么就会造成有的并行度空闲 ,浪费资源。
3)Transform 端并行度的配置
➢ Keyby 之前的算子
一般不会做太重的操作 ,比如 map、filter、flatmap 等处理较快的算子 ,并行度可以和 source 保持一致。
➢ Keyby 之后的算子
如果并发较大 ,建议设置并行度为 2 的整数次幂 ,例如:128、256、512;
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy ,并行度也无需设置为 2 的整数次幂;
4)Sink 端并行度的配置
根据 Sink 端的数据量及下游的服务抗压能力进行评估。如果 Sink 端是 Kafka ,可以设为 Kafka 对应 Topic 的分区数。
Source 端的数据量是最小的 ,拿到 Source 端流过来的数据后做了细粒度拆分 ,数据量不断增加 ,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互 ,并行度还得根据下游的服务抗压能力来设置 ,如果在 Flink Sink 端的数据量过大的话 ,且 Sink 并行度也设置的很大 ,但下游的服务完全撑不住这么大的并发写入 ,可能会造成下游服务直接被写挂 ,最终还是要在此处的并行度做一定的权衡。