DataStream API
DataStream API 是 Flink 的核心层 API,一个 Flink 程序,其实就是对 DataStream 做各种转换。具体来说,代码基本上都由以下几部分构成:
- 创建一个执行环境(StreamExecutionEnvironment)
- 加载/创建初始数据(Source)
- 对数据进行转换(Transformation)
- 输出结果(Sink)
- 触发执行(execute)
执行环境(Execution Environment)
Flink 程序可以在各种上下文环境中运行,比如我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行,一会儿在学习算子的时候就直接在本地 JVM 中运行。不同的环境,代码提交运行的过程会有所不同,这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。
from pyflink.datastream import StreamExecutionEnvironment
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 设置执行环境的一些属性
env.set_parallelism(10)
env.set_python_executable("python3")
关于批处理,官方推荐的做法是直接使用 DataStream API,不建议使用 DataSet API(已经废弃)。因为 DataStream 已经是批流一体,只需在提交任务时将执行模式设置为 BATCH,便可实现批处理。而执行模式,DataStream 支持三种:
流执行模式(Streaming):
这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式。批执行模式(Batch):
专门用于批处理的执行模式。自动模式(AutoMatic):
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
而设置执行模式,主要有两种方式。
1) 通过命令行设置:flink run -Dexecution.runtime-mode=BATCH ···,在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。
2) 通过代码设置。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.execution_mode import RuntimeExecutionMode
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(10)
env.set_python_executable("python3")
# 设置执行模式为 BATCH,当然也可以是 STREAMING 和 AUTOMATIC
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC)
源算子
Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator),所以 Source 就是我们整个处理程序的输入端。
从本地集合中读取数据
# 创建 DataStream 最简单的方式,直接调用执行环境的 from_collection 方法,即可将本地集合转成 DataStream。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 定义数据集合
data = ['Alice', 'Bob', 'John', 'Mary']
# 从集合中读取数据创建表
t_env.create_temporary_view(
"People",
t_env.from_elements(data, DataTypes.STRING())
)
# 执行查询
t_env.select_sql("SELECT * FROM People").execute().print()
# 执行环境的执行
env.execute()
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.execution_mode import RuntimeExecutionMode
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# 从本地集合读取数据,显然数据是有界的,因此对应批处理
# 而 DataStream 默认是流处理,所以这里要将执行模式设置为 BATCH
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# 基于列表创建 DataStream
ds = env.from_collection([1, 2, 3, 4, 5])
print(ds)
# 调用 print 方法打印数据
ds.print()
# 我们说 Flink 的算子是懒加载的,需要调用执行环境的 execute 方法才会触发执行
env.execute(job_name="from_collection")
"""
<pyflink.datastream.data_stream.DataStream object at 0x00000212CB896110>
1
2
3
4
5
"""