4. Flink 算子与并行度

Flink 算子与并行度

Flink 数据并行

每一个算子(Operator)可以包含一个或多个子任务(Operator Subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。比如 map 算子要处理 10G 的数据,现在拆分成两个 5G 的数据,然后这两个 5G 的数据分别作用 map,那么此时 map 算子的并行度就是 2。

不同的算子可能具有不同的并行度,比如图中的 sink 算子的并行度是 1,其它算子的并行度是 2。一般情况下,一个流程序的并行度,可以认为是其所有算子中最大的并行度。并行度代表了子任务的数量,包含并行子任务的数据流,就是并行数据流,它需要多个分区(Stream Partition)来分配并行任务。

那么我们如何设置并行度呢?有三种方式,优先级从高到低。
1) 通过代码设置

    from pyflink.datastream import StreamExecutionEnvironment

    # 该方法返回一个 StreamExecutionEnvironment 对象,它是 Flink 程序的入口点、或者说执行环境
    # 不管使用什么 API,StreamExecutionEnvironment 对象都是必须的
    env = StreamExecutionEnvironment.get_execution_environment()
    # 然后我们可以设置一些参数,比如并行度、最大并行度等等,有很多参数可以设置
    # 直接输入 env.set_ ,然后 PyCharm 会自动提示,比如我们来设置一下并行度
    env.set_parallelism(2)
    # 再比如指定解释器的版本,这里指定为 Python3
    # 当然,如果你在终端输入 python 指向的就是 Python3,那么也可以不用设置
    env.set_python_executable("python3")  # 指定完整路径也是可以的

2) 提交应用时设置

  • 在使用 flink run 命令提交应用时,可以通过 -p 参数指定当前应用程序执行的并行度。
    3) 通过配置文件设置
  • 配置文件 flink-conf.yaml 中直接更改默认并行度, parallelism.default: 2

算子链(Operator Chain)

一个数据流在算子之间的传输形式可以是一对一的直通模式(forwarding),也可以是打乱的重分区模式(redistributing),具体是哪一种模式,取决于算子的种类。

在一对一模式下,数据流维护着分区以及元素的顺序,比如上图图中的 source 和 map 算子。source 算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务看到的元素个数和顺序,跟 source 算子的子任务是完全一样的,保证一对一的关系。map、filter、flatMap 等算子都是这种一对一的对应关系,这种关系就类似于 Spark 当中的窄依赖。

而在重分区模式下,数据流的分区会发生改变。比如图中的 map 和后面的 keyBy / window 算子之间,以及 keyBy / window 算子和 sink 算子之间,都是这样的关系。这些算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这种传输方式会引起重分区的过程,类似于 Spark 中的宽依赖(Shuffle)。

在 Spark 中,一个 Job 会基于宽依赖划分为多个 Stage,保证每个 Stage 里的算子操作都是窄依赖的,不会出现 Shuffle。然后在一个 Stage 中,每个分区上的所有算子都由同一个线程执行。对于 Flink 也是如此,图中的 source、map、filter 算子都是窄依赖的,每个分区操作都是独立的。但像 keyBy 这种宽依赖的算子则不同,因为 keyBy 是按 key 聚合,而不同分区可能会包含具有相同 key 的数据,因此就会涉及数据的传输,此时我们也说产生了 Shuffle。

而对于这些窄依赖的算子,完全可以将它们合并在一起,由同一个线程执行,这样可以减少线程之间的切换和基于缓存区的数据交换,从而降低延迟并提升吞吐量。由于图中有 9 个算子,本来应该产生 9 个任务,但 source、map、filter 合并在了一起,所以最终的任务数是 5,由 5 个线程执行。

算子的合并在 Flink 中是默认行为,而我们也可以选择不合并。为了效率,我们一般会默认选择合并。

# 禁用算子链
data.map(lambda x: x).disable_chaining()
# 从当前算子开始新链
data.map(lambda x: x).start_new_chain()

任务槽(Task Slot)

Flink 中每一个 TaskManager 都是一个 JVM 进程,它可以启动多个独立的线程,来并行执行多个子任务(Subtask)。但 TaskManager 的计算资源是有限的,并行的任务越多,每个线程的资源就会越少。为了控制并发量,我们需要在 TM 上对每个任务运行所占用的资源做出明确划分,这就是所谓的任务槽。每个任务槽(Task Slot)其实就是 TM 计算资源的一个固定大小的子集,用来独立执行子任务。

假如一个 TM 有三个 Slot,那么它会将管理的内存划分为三份,每个 Slot 独占一份。这样的话,在 Slot 上执行子任务时,相当于划定了一块专属的内存,就不用和其它任务竞争资源了。所以如果想处理上面的 5 个子任务,只需要分配两个 TM 就好了。

那么 Task Slot 的数量应该如何设置呢?在 flink-conf.yaml 里面有一个配置参数:

# 指定每个 TM 的 Task Slot 数量,默认为 1
taskmanager.numberOfTaskSlots: 8

需要注意的是,Slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。所以在具体应用时,可以将 Slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争,这也是开发环境默认并行度设置为机器 CPU 核心数的原因。

Flink 默认是允许 Slot 共享的,但如果希望某个算子对应的任务完全独占一个 Slot,或者只有某一部分算子共享 Slot,我们也可以通过设置 Slot 共享组手动指定。

data.map(lambda x: x).slot_sharing_group("1")

这样只有属于同一个 Slot 共享组的子任务,才会开启 Slot 共享,而不同组之间的任务是完全隔离的,必须分配到不同的 Slot 上。在这种场景下,总共需要的 Slot 数量,就是各个 Slot 共享组最大并行度的总和。

任务槽和并行度的关系

任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概念,是指 TM 具有的并发执行能力,可以通过参数 taskmanager.numberOfTaskSlots 进行配置;而并行度是动态的概念,也就是 TM 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置(或者通过代码、命令行提交应用进行设置)。

设置某个算子的并行度
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
# 设置全局并行度,默认情况下所有算子的并行度都是 10
env.set_parallelism(10)
env.set_python_executable("python3")
# 创建一个 DataStream
data = env.from_collection([1, 2, 3, 4, 5])
# 单独设置 map 算子的并行度为 2
data.map(lambda x: x + 2).set_parallelism(2)