Flink Kubernetes Operator

Flink Kubernetes Operator 使用 Kubernetes API,提供云原生管理 Flink 集群的能力:

  • 部署、监控 Flink Session 和 Application 应用
  • 升级、暂停和删除应用
  • 日志和 metrics 集成
  • 支持弹性部署,与 Kuberentes 生态原生集成

安装

  1. 环境要求

    1. kubernetes
    2. helm
  2. 安装 flink-kubernetes-operator,参考 Try the Flink Kubernetes Operator

    # 安装 cert-manager
    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
    
    # 安装 flink-kubernetes-operator
    helm repo add flink-kubernetes-operator-1.8.0 https://archive.apache.org/dist/flink/flink-kubernetes-operator-1.8.0/
    
    # scaleph 对 flink-kubernetes-operator 的默认配置进行了一定程度调整,用户需应用 ${SCALEPH_HOME}/tools/kubernetes/flink/values.yaml 调整
    # tools/kubernetes/flink/values.yaml 不支持 FlinkSessionJob 任务
    # tools/kubernetes/flink/values-session.yaml 替换 http://${IP}:9000 中 IP 为真实的 Minio 地址后,支持 FlinkSessionJob 任务
    helm install flink-kubernetes-operator flink-kubernetes-operator-1.8.0/flink-kubernetes-operator --values tools/kubernetes/flink/values.yaml
    
    # 查看安装状态
    kubectl get deployment
    # 查看安装详情
    kubectl describe deployment flink-kubernetes-operator
  3. 提交任务

    # 任务创建时需要拉取 flink 镜像,为了安装体验可以预先拉取镜像
    # docker pull flink:1.17
    
    # 提交任务
    kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.8/examples/basic.yaml
  4. 查看任务

    # 查看任务信息
    kubectl get deployment
    kubectl get pods
    
    # 查看任务日志
    kubectl logs -f deploy/basic-example
    
    # 访问 flink web-ui
    # 访问 http://localhost:8081
    kubectl port-forward svc/basic-example-rest 8081
  5. 关闭任务

    kubectl delete flinkdeployment/basic-example
TIP

tools/kubernetes/flink/values.yaml 是未启用 Flink FileSystem 插件的,不支持 FlinkSessionJob 任务。欲支持 FlinkSessionJob 任务,请使用 tools/kubernetes/flink/values-session.yaml

注意

使用前需替换 tools/kubernetes/flink/values-session.yaml 中的 s3.endpoint: http://${IP}:9000 中的 IP 变量为真实的 Minio 地址

Ingress

在 Kubernetes 中,外部访问集群内的服务有两种方式:serviceingress。其中 Flink 的 web ui 对 service 的 3 种类型都进行了支持,参考链接

  • ClusterIP
  • NodePort
  • LoadBalancer

Flink Kubernetes Operator 并不干涉 Flink web ui 的功能,用户在通过 Flink Kubernetes Operator 部署 Flink 任务的时候,仍然可以使用上述 3 种方式来访问 Flink web ui。但除此之外,Flink Kubernetes Operator 提供 ingress 配置,可以让用户在未配置外部访问的情况下,访问到 Flink web ui。

  1. 安装 nginx-ingress-controller

  2. yaml 增加 ingress 配置。参考 Ingress

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: advanced-ingress
    spec:
      image: flink:1.17
      flinkVersion: v1_17
      ingress:
        template: "/{{namespace}}/{{name}}(/|$)(.*)"
        className: "nginx"
        annotations:
          nginx.ingress.kubernetes.io/rewrite-target: "/$2"
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "1024m"
          cpu: 0.1
      taskManager:
        resource:
          memory: "1024m"
          cpu: 0.25
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
  3. 部署任务

    # 部署任务
    kubectl apply -f advanced-ingress.yaml
    
    # 查看任务
    kubectl get FlinkDeployment
    
    kubectl get deployment
    
    kubectl get pods
    
    kubectl get ingress -A
    kubectl describe ingress $ingress
    
    kubectl get services
  4. 访问任务。https://localhost/default/advanced-ingress/

  5. 删除任务。

    kubectl delete -f advanced-ingress.yaml

监控

flink-kubernetes-operator 继承了 flink 的 metrics 功能。参考:Metrics and Logging

安装 flink-kubernetes-operator 时,修改 values.yaml 添加如下内容,启用 prometheus 监控:

defaultConfiguration:
  # If set to true,
  #      (1) loads the built-in default configuration
  #      (2) appends the below flink-conf and logging configuration overrides
  # If set to false, loads just the overrides as in (2).
  # This option has not effect, if create is equal to false.
  append: true
  flink-conf.yaml: |+
    # Flink Config Overrides
    kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
    kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE

    kubernetes.operator.reconcile.interval: 15 s
    kubernetes.operator.observer.progress-check.interval: 5 s
    
    kubernetes.operator.metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9999

# (Optional) Exposes metrics port on the container if defined
metrics:
  port: 9999
TIP

tools/kubernetes/flink/values.yamltools/kubernetes/flink/values-session.yaml默认都添加上述配置

创建 Service 暴露 flink kubernetes operator 的 metrics 端口

apiVersion: v1
kind: Service
metadata:
  name: flink-kubernetes-operator-metrics
  annotations:
    prometheus.io/port: "9999"
    prometheus.io/scrape: "true"
  labels:
    # Define all flink jobs's metrics service labels for promethues operator's ServiceMonitor
    app: flink-kubernetes-operator-metrics
    component: metrics
    platform: scaleph
spec:
  # Match all flink kubernetes operator's pod by labels
  # todo 增加 scaleph 的专有注解,明确采集 scaleph 管理的 FlinkDeployment
  selector:
    app.kubernetes.io/name: flink-kubernetes-operator
  ports:
    - name: prom-metrics
      port: 9999
      targetPort: 9999
      protocol: TCP

创建 ServiceMonitor 获取 flink kubernetes operator 的监控信息

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: flink-kubernetes-operator-metrics
  labels:
  	# Define service monitor labels for promethues operator's Promethues
  	# todo 统一定义 platform: scaleph
  	group: flink-kubernetes-operator-metrics
  	platform: scaleph
    ...
spec:
  selector:
    # Match the above flink jobs's metrics service labels
    matchLabels:
      app: flink-kubernetes-operator-metrics
      component: metrics
      platform: scaleph
      ...
  endpoints:
    - port: prom-metrics # Above service ports name
      interval: 2s