Flink On K8s
Flink K8S部署
# Flink Kubernetes Operator
# 在 Kubernetes 集群上安装证书管理器以启用添加 Webhook 组件(每个 Kubernetes 集群只需要一次):
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
# 如果证书管理器安装因任何原因失败 ,您可以通过传递给操作员的 helm install 命令来禁用 Webhook。--set webhook.create=false
# 现在 ,您可以使用随附的 Helm Chart 部署选定的稳定 Flink Kubernetes Operator 版本:
https://dlcdn.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-src.tgz
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# 要查找稳定版本列表 ,请访问 https://flink.apache.org/downloads.html
# 默认情况下 ,Helm Chart 指向镜像仓库。 如果您有连接问题或更喜欢使用 Dockerhub ,则可以在安装过程中使用。ghcr.io/apache/flink-kubernetes-operator
--set image.repository=apache/flink-kubernetes-operator
# 您可以通过以下方式验证您的安装:
kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-fb5d46f94-ghd8b 2/2 Running 0 4m21s
helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 +0100 CET deployed flink-kubernetes-operator-1.10.0 1.10.0
# 提交 Flink 作业#
# 如上一步所示 ,操作员运行后 ,您就可以提交 Flink 作业了:
kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.10/examples/basic.yaml
# 您可以按照作业的日志进行操作 ,在成功启动后(在新环境中可能需要一分钟左右 ,然后几秒钟) ,您可以:
kubectl logs -f deploy/basic-example
2022-03-11 21:46:04,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 206 (type=CHECKPOINT) @ 1647035164458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:04,465 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 206 for job a12c04ac7f5d8418d8ab27931bf517b7 (28509 bytes, checkpointDuration=7 ms, finalizationTime=0 ms).
2022-03-11 21:46:06,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 207 (type=CHECKPOINT) @ 1647035166458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:06,483 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 207 for job a12c04ac7f5d8418d8ab27931bf517b7 (28725 bytes, checkpointDuration=25 ms, finalizationTime=0 ms).
# 要公开 Flink Dashboard ,您可以添加 port-forward 规则或查看 ingress 配置选项:
kubectl port-forward svc/basic-example-rest 8081
# 现在 Flink Dashboard 可以在 localhost:8081 上访问。
# 为了停止你的作业并删除你的 FlinkDeployment ,你可以:
kubectl delete flinkdeployment/basic-example
# Flink Kubernetes Operator 支持:原生部署native(默认)和独立部署standalone
# 可以使用部署规范中的 mode 字段设置部署模式。
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
...
spec:
...
mode: standalone
# Flink Kubernetes Operator 支持两种主要类型的部署:Application集群 和 Session集群 ,k8s上不支持Job集群。
# Session集群 ,是常规的共享方式。
# Application集群 ,提供了更好的隔离 ,生命周期与程序逻辑有关。
# Job集群 ,适合长期运行、要求高稳定性的大型作业。(启动慢)
k8s高可用环境
# Flink 提供了两种高可用服务实现:
# ZooKeeper:每个 Flink 集群部署都可以使用 ZooKeeper HA 服务。它们需要一个运行的 ZooKeeper 复制组(quorum)。
# Kubernetes:Kubernetes HA 服务只能运行在 Kubernetes 上。
# 注:Flink 持久化元数据和 job 组件 ,直到作业执行成功、被取消或最终失败 ,再删除。
# 配置高可用环境示例-Kubernetes
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
flinkConfiguration:
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
web.upload.dir: /flink-data # 会自动创建flink-web-upload目录保存上传的jar包,共享存储支持多实例挂载文件系统
jobManager:
replicas: 2
taskManager:
replicas: 2
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
persistentVolumeClaim:
claimName: flink-ha
# 配置高可用环境示例-zookeeper
vim flinkcluster.yaml
apiVersion: flink.apache.org/v1
kind: FlinkCluster
metadata:
name: my-flink-cluster
namespace: default
spec:
job:
jarURI: local:///path/to/your/flink-job.jar
taskManager:
replicas: 2
jobManager:
replicas: 2
flinkConfiguration:
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha
kubectl apply -f flinkcluster.yaml
# 部署 Flink 作业
vim flink-job.yaml
apiVersion: flink.apache.org/v1
kind: FlinkApplication
metadata:
name: flink-job
namespace: default
spec:
flinkCluster:
name: my-flink-cluster
job:
jarURI: local:///path/to/your/flink-job.jar
parallelism: 2
kubectl apply -f flink-job.yaml
# 监控与维护
vim service-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: flink
labels:
app: flink
spec:
selector:
matchLabels:
app: flink
endpoints:
- port: http
path: /metrics
kubectl apply -f service-monitor.yaml
Session集群
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment-example
spec:
image: flink:1.17
flinkVersion: v1_17
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example2
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
parallelism: 2
upgradeMode: stateless
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
application模式
# 此crd创建后 ,operator会创建:
# 1个deploy(即jobmanager,镜像为flink:1.17)
# 1个pod(即taskManager,镜像也是flink:1.17,任务jar包在镜像中)
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
kubectl apply -f ${name}.yaml
kubectl port-forward svc/basic-example-rest 8081
# 访问 http://ip:8081