3. Flink On K8s

Flink On K8s

Flink K8S部署

# Flink Kubernetes Operator
# 在 Kubernetes 集群上安装证书管理器以启用添加 Webhook 组件(每个 Kubernetes 集群只需要一次):
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
# 如果证书管理器安装因任何原因失败 ,您可以通过传递给操作员的 helm install 命令来禁用 Webhook。--set webhook.create=false
# 现在 ,您可以使用随附的 Helm Chart 部署选定的稳定 Flink Kubernetes Operator 版本:
https://dlcdn.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-src.tgz
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# 要查找稳定版本列表 ,请访问 https://flink.apache.org/downloads.html

# 默认情况下 ,Helm Chart 指向镜像仓库。 如果您有连接问题或更喜欢使用 Dockerhub ,则可以在安装过程中使用。ghcr.io/apache/flink-kubernetes-operator
--set image.repository=apache/flink-kubernetes-operator
# 您可以通过以下方式验证您的安装:

kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-fb5d46f94-ghd8b 2/2 Running 0 4m21s

helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 +0100 CET deployed flink-kubernetes-operator-1.10.0 1.10.0
# 提交 Flink 作业#
# 如上一步所示 ,操作员运行后 ,您就可以提交 Flink 作业了:

kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.10/examples/basic.yaml
# 您可以按照作业的日志进行操作 ,在成功启动后(在新环境中可能需要一分钟左右 ,然后几秒钟) ,您可以:

kubectl logs -f deploy/basic-example

2022-03-11 21:46:04,458 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 206 (type=CHECKPOINT) @ 1647035164458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:04,465 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 206 for job a12c04ac7f5d8418d8ab27931bf517b7 (28509 bytes, checkpointDuration=7 ms, finalizationTime=0 ms).
2022-03-11 21:46:06,458 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 207 (type=CHECKPOINT) @ 1647035166458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:06,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 207 for job a12c04ac7f5d8418d8ab27931bf517b7 (28725 bytes, checkpointDuration=25 ms, finalizationTime=0 ms).
# 要公开 Flink Dashboard ,您可以添加 port-forward 规则或查看 ingress 配置选项:

kubectl port-forward svc/basic-example-rest 8081
# 现在 Flink Dashboard 可以在 localhost:8081 上访问。
# 为了停止你的作业并删除你的 FlinkDeployment ,你可以:
kubectl delete flinkdeployment/basic-example

# Flink Kubernetes Operator 支持:原生部署native(默认)和独立部署standalone
# 可以使用部署规范中的 mode 字段设置部署模式。
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
...
spec:
  ...
  mode: standalone
# Flink Kubernetes Operator 支持两种主要类型的部署:Application集群 和 Session集群  ,k8s上不支持Job集群。
# Session集群 ,是常规的共享方式。
# Application集群 ,提供了更好的隔离 ,生命周期与程序逻辑有关。
# Job集群 ,适合长期运行、要求高稳定性的大型作业。(启动慢)

k8s高可用环境

# Flink 提供了两种高可用服务实现:
# ZooKeeper:每个 Flink 集群部署都可以使用 ZooKeeper HA 服务。它们需要一个运行的 ZooKeeper 复制组(quorum)。
# Kubernetes:Kubernetes HA 服务只能运行在 Kubernetes 上。
# 注:Flink 持久化元数据和 job 组件 ,直到作业执行成功、被取消或最终失败 ,再删除。

# 配置高可用环境示例-Kubernetes
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
  flinkConfiguration:
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
    web.upload.dir: /flink-data  # 会自动创建flink-web-upload目录保存上传的jar包,共享存储支持多实例挂载文件系统
  jobManager:
    replicas: 2
  taskManager:
    replicas: 2
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /flink-data
              name: flink-volume
      volumes:
        - name: flink-volume
          persistentVolumeClaim:
            claimName: flink-ha

# 配置高可用环境示例-zookeeper
vim flinkcluster.yaml
apiVersion: flink.apache.org/v1
kind: FlinkCluster
metadata:
  name: my-flink-cluster
  namespace: default
spec:
  job:
    jarURI: local:///path/to/your/flink-job.jar
  taskManager:
    replicas: 2
  jobManager:
    replicas: 2
  flinkConfiguration:
    high-availability: zookeeper
    high-availability.storageDir: hdfs:///flink/ha

kubectl apply -f flinkcluster.yaml

# 部署 Flink 作业
vim flink-job.yaml
apiVersion: flink.apache.org/v1
kind: FlinkApplication
metadata:
  name: flink-job
  namespace: default
spec:
  flinkCluster:
    name: my-flink-cluster
  job:
    jarURI: local:///path/to/your/flink-job.jar
    parallelism: 2

kubectl apply -f flink-job.yaml

# 监控与维护
vim service-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: flink
  labels:
    app: flink
spec:
  selector:
    matchLabels:
      app: flink
  endpoints:
  - port: http
    path: /metrics

kubectl apply -f service-monitor.yaml

Session集群

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-session-deployment-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  serviceAccount: flink
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example
spec:
  deploymentName: basic-session-deployment-example
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
    parallelism: 4
    upgradeMode: stateless

---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example2
spec:
  deploymentName: basic-session-deployment-example
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
    parallelism: 2
    upgradeMode: stateless
    entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

application模式

# 此crd创建后 ,operator会创建:
# 1个deploy(即jobmanager,镜像为flink:1.17)
# 1个pod(即taskManager,镜像也是flink:1.17,任务jar包在镜像中)
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

kubectl apply -f ${name}.yaml

kubectl port-forward svc/basic-example-rest 8081

# 访问 http://ip:8081