Flink Kubernetes Operator
Flink Kubernetes Operator 使用 Kubernetes API,提供云原生管理 Flink 集群的能力:
- 部署、监控 Flink Session 和 Application 应用
- 升级、暂停和删除应用
- 日志和 metrics 集成
- 支持弹性部署,与 Kuberentes 生态原生集成
体验
-
环境要求
-
准备 kubernetes 环境
- docker desktop 自带的 kubernetes
- k3s
- minikube
-
创建 Flink 任务专用 service account
kubectl create serviceaccount flink-service-account kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
-
安装 flink-kubernetes-operator
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.5.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set image.repository=apache/flink-kubernetes-operator # 查看安装状态 kubectl get deployment # 查看安装详情 kubectl describe deployment flink-kubernetes-operator
-
提交任务
# 任务创建时需要拉取 flink 镜像,为了安装体验可以预先拉取镜像 docker pull flink:1.16 # 提交任务 kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.4/examples/basic.yaml
-
查看任务
# 查看任务信息 kubectl get deployment kubectl get pods # 查看任务日志 kubectl logs -f deploy/basic-example # 访问 flink web-ui # 访问 http://localhost:8081 kubectl port-forward svc/basic-example-rest 8081
-
关闭任务
kubectl delete flinkdeployment/basic-example
Ingress
在 Kubernetes 中,外部访问集群内的服务有两种方式:service 和 ingress。其中 Flink 的 web ui 对 service 的 3 种类型都进行了支持,参考链接:
- ClusterIP
- NodePort
- LoadBalancer
Flink Kubernetes Operator 并不干涉 Flink web ui 的功能,用户在通过 Flink Kubernetes Operator 部署 Flink 任务的时候,仍然可以使用上述 3 种方式来访问 Flink web ui。但除此之外,Flink Kubernetes Operator 提供 ingress 配置,可以让用户在未配置外部访问的情况下,访问到 Flink web ui。
-
安装 nginx ingress
# 安装 ingress-nginx kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.8.0/deploy/static/provider/cloud/deploy.yaml # 检验安装结果 kubectl get pods -n ingress-nginx kubectl get services -n ingress-nginx
-
yaml 增加 ingress 配置
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: advanced-ingress spec: image: flink:1.17 flinkVersion: v1_17 ingress: template: "/{{namespace}}/{{name}}(/|$)(.*)" className: "nginx" annotations: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: "1024m" cpu: 0.1 taskManager: resource: memory: "1024m" cpu: 0.25 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2
-
部署任务
# 部署任务 kubectl apply -f advanced-ingress.yaml # 查看任务 kubectl get FlinkDeployment kubectl get deployment kubectl get pods kubectl get ingress -A kubectl describe ingress $ingress kubectl get services
-
删除任务。
kubectl delete -f advanced-ingress.yaml
CRD
Flink 除了 Standalone,还支持多种资源调度框架如 YARN、Kubernetes。在 Kubernetes 上支持 Application 和 Session 模式。Flink Kubernetes Operator 同时提供在 Kubernetes 上创建 Standalone 集群,以 Application 和 Session 模式原生运行 Flink 的功能。
Flink Kubernetes Operator 以 CRD 的方式提供云原生配置,分别定义 FlinkDeployment
和 FlinkSessionJob
。
FlinkDeployment
包含信息如下:
- JobSpec
- flinkConfiguration
- logConfiguration
- JobManager,Taskmanager,PodTemplate
- IngressSpec
JobSpec
/** Flink job spec. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@EqualsAndHashCode
@JsonIgnoreProperties(ignoreUnknown = true)
public class JobSpec implements Diffable<JobSpec> {
private String jarURI;
private String entryClass;
private String[] args = new String[0];
private String initialSavepointPath;
private Boolean allowNonRestoredState;
private int parallelism;
private UpgradeMode upgradeMode = UpgradeMode.STATELESS;
private JobState state = JobState.RUNNING;
private Long savepointTriggerNonce;
}
JobManagerSpec
,TaskmanagerSpec
,PodTemplate
/** JobManager spec. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class JobManagerSpec {
private Resource resource;
private int replicas = 1;
private Pod podTemplate;
}
/** TaskManager spec. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class TaskManagerSpec implements Diffable<TaskManagerSpec> {
private Resource resource;
private Integer replicas;
private Pod podTemplate;
}
/** Resource spec. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Resource {
private Double cpu;
private String memory;
private String ephemeralStorage;
}
IngresssSpec
/** Ingress spec. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class IngressSpec {
private String template;
private String className;
private Map<String, String> annotations;
}
flinkConfiguration
, logConfiguration
private Map<String, String> flinkConfiguration;
private Map<String, String> logConfiguration;