2. Flink On Yarn

Flink On Yarn

资源管理层面

  • Resource Manager: 管理整个集群资源,相当于 Master,后续简称 RM。
  • Node Manager: 管理所在节点的资源,相当于 Worker,后续简称 NM。
    所以 NM 负责管理单个节点,而每个节点上都有一个 NM,那么多个 NM 就能将所有节点的资源都管理起来,然后这些 NM 统一去向 RM 进行汇报。所以整个资源管理,就是 RM 配合一堆 NM 完成的。但是光有资源还不够,因为最终的目的还是要完成计算的,所以必须要有干活的。

任务计算层面

  • Application Master(后续简称 AM): 任务的管理者,当任务在 NM 上运行的时候,就是由 AM 负责管理,比如任务失败重启,任务资源分配,任务的工作调度,每个任务都对应一个 AM。
  • Task: 任务的执行者,真正用来干活的。
  • Container: 这个图上没有画,但 AM 和 Task 都运行在 Container 里面。在 YARN 中它代表了资源的抽象,封装了节点上的多维度资源,如内存、CPU、磁盘、网络等等。

Flink on Yarn

  • Per-Job 模式: 每次提交作业时,Flink 都会为该作业启动一个新的独立集群,等到作业运行结束后再关闭集群。这种模式确保了隔离性,每个作业都在自己的 Flink 集群上运行,不会和其它作业相互干扰。所以 Per-Job 模式特别适合资源敏感和需要隔离的长时间运行的作业,但如果有很多短时间的作业需要运行,那么频繁地启动和停止集群将会导致大量的资源浪费和时间开销。
  • Application 模式: 类似于 Per-Job 模式,因为在这种模式下,Flink 集群的生命周期也是与单个应用程序的生命周期绑定的。不过 Application 模式通常更适合云环境,以及使用 YARN、Kubernetes 做资源管理,其中集群的启动和应用程序的提交可以被打包成一个单一的步骤。这种模式同样适合资源敏感和需要隔离的作业,并且还包含所有依赖的时候。
  • Session 模式下: 会先启动一个 Flink 集群,然后等待作业提交。这种模式适合于多个作业需要反复或同时运行在同一个 Flink 集群上的场景,因为它可以避免每次作业运行时都重复启动和停止集群的开销。

Resource Manager

1) 处理客户端请求。客户端想访问集群,比如提交一个应用程序,或者说作业(可以是 Spark 作业,也可以是 MapReduce 作业) ,要经过 Resource Manager,它是整个资源的管理者,管理整个集群的 CPU、内存、磁盘等资源;
2) 监控 Node Manager;
3) 启动或监控 Application Master;
4) 资源的分配和调度;

Node Manager

1) 管理单个节点上的资源,Node Manager 是当前节点资源的管理者,当然它也需要跟 Resource Manager 汇报;
2) 处理来自 Resource Manager 的命令,比如启动 Container 运行 Application Master;
3) 处理来自 Application Master 的命令,比如启动 Container 运行 Task;

Application Master

1) 某个任务的管理者。当任务在 Node Manager 上运行的时候,就是由 Application Master 负责管理,每个任务都会对应一个 AM。当然 JM 也是如此,它也对应一个 AM;
2) 负责数据的切分;
3) 为应用程序向 RM 申请资源,并分配给内部的任务;
4) 任务的监控与容错;

Task

1) 任务的实际执行单元,运行在 AM 申请到的 Container 中。每个任务独占一个 Container,从而实现有效的资源管理和隔离。
所以整个 YARN 的流程应该不复杂,RM 管理全局资源,NM 管理单个节点资源,AM 管理单个任务,Task 负责执行任务,Container 代表了资源的抽象,AM 和 Task 都运行在 Container 中。因为 Container 是资源的抽象,所以不光是 Task,JobManager 也运行在 Container 中,也对应一个负责管理的 AM。

那么 Flink on YARN 都有哪些优势呢?

  • 可以和现有的大数据平台无缝对接(Hadoop 需要 2.4 版本以上)
  • 部署集群与任务提交都非常简单
  • 资源管理统一通过 YARN,提升整体资源利用率
  • 基于 Native 方法,TM 可以按需申请和启动,从而防止资源浪费
  • 借助于 Hadoop YARN 提供的自动 failover 机制,可以保证容错,能保证 JM、TM 节点从异常中正常恢复

环境配置

vim /etc/profile

export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_HOME=/usr/local/flink-1.18.1
export PATH=$FLINK_HOME/bin:$PATH

source /etc/profile

vim bin/pyflink-shell.sh
PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python3"}"

yum install -y xz-devel mesa-libGL python-backports-lzma
cd /usr/local/python-3.8.20
./configure --prefix=/usr/local/python-3.8.20
make
sudo make altinstall
/usr/local/python-3.8.20/bin/python3.8 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple

# 安装依赖
pip3 install apache-flink=1.18.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

# 创建历史服务日志目录
hdfs dfs -mkdir -p /user/flink/flink-job
hdfs dfs -chmod 777 -R /user/flink

flink 配置

# cat flink-conf.yaml
python.client.executable: /usr/local/python-3.8.20/bin/python3.8
env.java.home: /usr/local/jdk-11.0.18
containerized.master.env.JAVA_HOME: /usr/local/jdk-11.0.18
containerized.taskmanager.env.JAVA_HOME: /usr/local/jdk-11.0.18
env.java.opts.all: -XX:MaxDirectMemorySize=8G --add-exports=java.base/sun.net.util=ALL-UNNAMED \
    --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED \
    --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
    --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
    --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
    --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
    --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED \
    --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
    --add-opens=java.base/java.lang=ALL-UNNAMED \
    --add-opens=java.base/java.net=ALL-UNNAMED \
    --add-opens=java.base/java.io=ALL-UNNAMED \
    --add-opens=java.base/java.nio=ALL-UNNAMED \
    --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
    --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
    --add-opens=java.base/java.text=ALL-UNNAMED \
    --add-opens=java.base/java.time=ALL-UNNAMED \
    --add-opens=java.base/java.util=ALL-UNNAMED \
    --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
    --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
    --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
jobmanager.rpc.address: 172.16.100.41
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 10240m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 172.16.100.41
taskmanager.memory.process.size: 20480m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
jobmanager.execution.failover-strategy: region
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.min: 1024mb
taskmanager.memory.network.max: 10gb
resourcemanager.taskmanager-timeout: 900000

配置历史服务器

# 历史作业的数据存储位置
jobmanager.archive.fs.dir: hdfs://ns1:8020/user/flink/flink-job
# 历史服务器监听的 IP 和端口
historyserver.web.address: 172.16.100.41
historyserver.web.port: 9001
# 历史服务器去哪个目录查询历史作业,和 jobmanager.archive.fs.dir 是一致的
historyserver.archive.fs.dir: hdfs://ns1:8020/user/flink/flink-job
# 历史作业的数据是不断更新的,那么历史服务器多久检测一次呢?默认 10 秒
historyserver.archive.fs.refresh-interval: 10000

## 配置完成后,我们启动历史服务器。
# 启动历史服务器:
historyserver.sh start
# 关闭历史服务器:
historyserver.sh stop

# # JobManager
# jobmanager.memory.process.size = jobmanager.memory.flink.size + jobmanager.memory.jvm-metaspace.size + jobmanager.memory.jvm-overhead.size
# jobmanager.memory.flink.size = jobmanager.memory.heap.size + jobmanager.memory.off-heap.size

# # TaskManager
# taskmanager.memory.process.size = taskmanager.memory.flink.size + taskmanager.memory.jvm-metaspace.size + taskmanager.memory.jvm-overhead.size
# taskmanager.memory.flink.size = JVM堆内存 + taskmanager.memory.managed.size + 直接内存
# JVM堆内存 = taskmanager.memory.task.heap.size + taskmanager.memory.framework.heap.size
# 直接内存 = taskmanager.memory.framework.off-heap.size + taskmanager.memory.task.off-heap.size + 网络内存
# flink on yarn 企业生产环境运行flink任务大多数的选择
# 好处:集群资源由yarn集群统一调度和管理 ,提高利用率 ,flink中jobmanager的高可用操作就由yarn集群来管理实现。
# 主要是在yarn-site.xml中配置关闭内存校验,否则flink任务可能会因为内存超标而被yarn集群主动杀死
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

### flink on yarn

## session 会话模式
# 使用yarn-session.sh命令申请资源初始化一个flink集群
bin/yarn-session.sh -d -p 8 -s 8 -jm 8192 -tm 20480 -nm test
# -d:分离模式 ,如果你不想让Flink YARN客户端一直前台运行 ,可以使用这个参数 ,即使关掉当前对话窗口 ,YARN session也可以后台运行。
# -jm(–jobManagerMemory):配置JobManager所需内存 ,默认单位MB。
# -nm(–name):配置在YARN UI界面上显示的任务名。
# -qu(–queue):指定YARN队列名。
# -tm(–taskManager):配置每个TaskManager所使用内存。
# -s 表示每个TaskManager的slots数量
/usr/local/flink-1.18.1/bin/flink run /usr/local/flink-1.18.1/examples/batch/WordCount.jar
# 停止 flink on yarn 会话模式中的flink集群
yarn application -kill application_XXXX_YY
# 会话模式这种方式的优缺点:
# 缺点:1 会一直有一个程序运行在yarn集群中 ,不管有没有任务提交执行 ,浪费资源 ,
# 优点:flink 集群环境是提前准备好的不需要为每个作业单独创建flink环境
# 适用场景:大量的小作业的时候可以考虑使用这种方式
# python脚本提交
export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3
/usr/local/flink-1.18.1/bin/flink run \
  -Djobmanager.memory.process.size=4096mb \
  -Dtaskmanager.memory.process.size=8192mb \
  -Dtaskmanager.numberOfTaskSlots=2 \
  -Dtaskmanager.memory.task.off-heap.size=2048mb \
  -Dtaskmanager.memory.framework.off-heap.size=2048mb \
  -pyexec /usr/local/python-3.8.20/bin/python3.8  \
  -py /usr/local/flink-1.18.1/examples/python/table/basic_operations.py
# --pyFiles 提交主py文件依赖,指定python文件,可以是文件也可以是目录,如果是目录会递归加载目录下的所有文件,也可以将依赖打包成一个 .zip 或 .egg 文件
# -pyarch 表示虚拟环境的归档文件
# -pyexec 表示集群中 TaskManager 执行 Python 代码时的解释器路径,venv.zip 解压之后的 venv/bin/python
# -pyclientexec 表示提交作业的客户端上执行 Python 代码时的解释器路径

## 单作业模式部署 ,高版本中已经弃用
bin/flink run -m yarn-per-job -ys 8 -yjm 8192 -ytm 10240 /usr/local/flink-1.18.1/examples/batch/WordCount.jar
# yjm: jobmanager 内存
# ytm:taskmanager 内存
# ys:taskmanager slot
# yn: taskmanger 数量  1.8版本以后已弃用
# -p: 指定提交作业时的默认并行度。
# 提交任务之后会在yarn集群按照我们的配置初始化一个flink集群 ,运行我们提交的作业 ,作业执行完成之后就释放资源关闭掉flink集群 ,把资源还给yarn集群。
# 总结:
# 优点:随到随用 ,只有任务需要运行时才会开启flink集群;运行完就关闭释放资源 ,资源利用更合理;
# 缺点:对于小作业不太友好 ,
# 适用场景:适合大作业 ,长时间运行的大作业。
# 取消任务命令执行后对应的 Flink集群也会停止 :
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

## Yarn Application模式
# Yarn Application 与Per-Job 模式类似 ,只是提交任务不需要客户端进行提交 ,直接由JobManager来进行任务提交 ,
# 每个Flink Application对应一个Flink集群 ,如果该Flink Application有多个job任务 ,所有job任务共享该集群资源 ,
# TaskManager也是根据提交的Application所需资源情况动态进行申请。
scp -r /usr/local/jdk-11.0.18 kube-40:/usr/local/
scp -r /usr/local/jdk-11.0.18 kube-42:/usr/local/
scp -r /usr/local/jdk-11.0.18 kube-39:/usr/local/
# Yarn Application模式提交任务命令
./bin/flink run-application \
    -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    /usr/local/flink-1.18.1/examples/batch/WordCount.jar
# 查看Flink 集群中的Job作业:
bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
#取消Flink集群中的Job作业:
bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
#停止集群 ,当取消Flink集群中所有任务后 ,Flink集群停止 ,也可以使用yarn application -kill ApplicationID 停止集群

## 其他参数参考
./bin/flink run \
    # 指定yarn的Per-job模式,-t等价于-Dexecution.target
    ## yarn-per-job
    ## yarn-session
    ## yarn-application
    -t yarn-per-job \
    # yarn应用的自定义name
    -Dyarn.application.name=wordcount \
    # 未指定并行度时的默认并行度值, 该值默认为1
    -Dparallelism.default=3 \
    # JobManager堆的内存
    -Djobmanager.memory.process.size=2048mb \
    # TaskManager堆的内存
    -Dtaskmanager.memory.process.size=2048mb \
    # 每个TaskManager的slot数目, 最佳配比是和vCores保持一致
    -Dtaskmanager.numberOfTaskSlots=2 \
    # 防止日志中文乱码
    -Denv.java.opts="-Dfile.encoding=UTF-8" \
    # 支持火焰图, Flink1.13新特性, 默认为false, 开发和测试环境可以开启, 生产环境建议关闭
    -Drest.flamegraph.enabled=true \
    # 入口类
    -c xxxx.MainClass \
    # 提交Job的jar包
    xxxx.jar

## 运行python脚本示例
/usr/local/flink-1.18.1/bin/flink run -pyexec /usr/bin/python3 -py /usr/local/flink-1.18.1/examples/python/datastream/word_count.py
/usr/local/flink-1.18.1/bin/flink run-application \
    -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -pyexec /usr/bin/python3 -py /usr/local/flink-1.18.1/examples/python/table/word_count.py