Flink On K8s
Flink K8S部署
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
https://dlcdn.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-src.tgz
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
--set image.repository=apache/flink-kubernetes-operator
kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-fb5d46f94-ghd8b 2/2 Running 0 4m21s
helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 +0100 CET deployed flink-kubernetes-operator-1.10.0 1.10.0
kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.10/examples/basic.yaml
kubectl logs -f deploy/basic-example
2022-03-11 21:46:04,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 206 (type=CHECKPOINT) @ 1647035164458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:04,465 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 206 for job a12c04ac7f5d8418d8ab27931bf517b7 (28509 bytes, checkpointDuration=7 ms, finalizationTime=0 ms).
2022-03-11 21:46:06,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 207 (type=CHECKPOINT) @ 1647035166458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:06,483 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 207 for job a12c04ac7f5d8418d8ab27931bf517b7 (28725 bytes, checkpointDuration=25 ms, finalizationTime=0 ms).
kubectl port-forward svc/basic-example-rest 8081
kubectl delete flinkdeployment/basic-example
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
...
spec:
...
mode: standalone
k8s高可用环境
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
flinkConfiguration:
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
web.upload.dir: /flink-data # 会自动创建flink-web-upload目录保存上传的jar包,共享存储支持多实例挂载文件系统
jobManager:
replicas: 2
taskManager:
replicas: 2
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
persistentVolumeClaim:
claimName: flink-ha
vim flinkcluster.yaml
apiVersion: flink.apache.org/v1
kind: FlinkCluster
metadata:
name: my-flink-cluster
namespace: default
spec:
job:
jarURI: local:///path/to/your/flink-job.jar
taskManager:
replicas: 2
jobManager:
replicas: 2
flinkConfiguration:
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha
kubectl apply -f flinkcluster.yaml
vim flink-job.yaml
apiVersion: flink.apache.org/v1
kind: FlinkApplication
metadata:
name: flink-job
namespace: default
spec:
flinkCluster:
name: my-flink-cluster
job:
jarURI: local:///path/to/your/flink-job.jar
parallelism: 2
kubectl apply -f flink-job.yaml
vim service-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: flink
labels:
app: flink
spec:
selector:
matchLabels:
app: flink
endpoints:
- port: http
path: /metrics
kubectl apply -f service-monitor.yaml
Session集群
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment-example
spec:
image: flink:1.17
flinkVersion: v1_17
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example2
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
parallelism: 2
upgradeMode: stateless
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
application模式
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
kubectl apply -f ${name}.yaml
kubectl port-forward svc/basic-example-rest 8081