天天看點

Flink 1.10 Native Kubernetes 原理與實踐

作者:周凱波(寶牛)

千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社群終于在 1.10 版本提供了對 Kubernetes 的原生支援,也就是

Native Kubernetes Integration

。不過還隻是Beta版本,預計會在 1.11 版本裡面提供完整的支援。

我們知道,在 Flink 1.9 以及之前的版本裡面,如果要在 Kubernetes 上運作 Flink 任務是需要事先指定好需要的 TaskManager(TM) 的個數以及CPU和記憶體的。這樣的問題是:大多數情況下,你在任務啟動前根本無法精确的預估這個任務需要多少個TM。如果指定的TM多了,會導緻資源浪費;如果指定的TM個數少了,會導緻任務排程不起來。本質原因是在 Kubernetes 上運作的 Flink 任務并沒有直接向 Kubernetes 叢集去申請資源。

Flink 在 1.10 版本完成了

Active Kubernetes Integration

的第一階段,支援了 session clusters。後續的第二階段會提供更完整的支援,如支援 per-job 任務送出,以及基于原生 Kubernetes API 的高可用,支援更多的 Kubernetes 參數如 toleration, label 和 node selector 等。

Active Kubernetes Integration

中的

Active

意味着 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申請新的 Pod,類似于 Flink 中對 Yarn 和 Mesos 的內建所做的那樣。在多租戶環境中,使用者可以利用 Kubernetes 裡面的 namespace 做資源隔離啟動不同的 Flink 叢集。當然,Kubernetes 叢集中的使用者帳号和賦權是需要提前準備好的。

原理

Flink 1.10 Native Kubernetes 原理與實踐

工作原理如下(段首的序号對應圖中箭頭所示的數字):

  1. Flink 用戶端首先連接配接 Kubernetes API Server,送出 Flink 叢集的資源描述檔案,包括 configmap,job manager service,job manager deployment 和 Owner Reference
  2. Kubernetes Master 就會根據這些資源描述檔案去建立對應的 Kubernetes 實體。以我們最關心的 job manager deployment 為例,Kubernetes 叢集中的某個節點收到請求後,Kubelet 程序會從中央倉庫下載下傳 Flink 鏡像,準備和挂載 volume,然後執行啟動指令。在 flink master 的 pod 啟動後,Dispacher 和 KubernetesResourceManager 也都啟動了。

前面兩步完成後,整個 Flink session cluster 就啟動好了,可以接受送出任務請求了。

  1. 使用者可以通過 Flink 指令行即 flink client 往這個 session cluster 送出任務。此時 job graph 會在 flink client 端生成,然後和使用者 jar 包一起通過 RestClinet 上傳。
  2. 一旦 job 送出成功,JobSubmitHandler 收到請求就會送出 job 給 Dispatcher。接着就會生成一個 job master。
  3. JobMaster 向 KubernetesResourceManager 請求 slots。
  4. KubernetesResourceManager 從 Kubernetes 叢集配置設定 TaskManager。每個TaskManager都是具有唯一表示的 Pod。KubernetesResourceManager 會為 TaskManager 生成一份新的配置檔案,裡面有 Flink Master 的 service name 作為位址。這樣在 Flink Master failover之後,TaskManager 仍然可以重新連上。
  5. Kubernetes 叢集配置設定一個新的 Pod 後,在上面啟動 TaskManager。
  6. TaskManager 啟動後注冊到 SlotManager。
  7. SlotManager 向 TaskManager 請求 slots。
  8. TaskManager 提供 slots 給 JobMaster。然後任務就會被配置設定到這個 slots 上運作。

實踐

Flink 的

文檔

上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。如果對 Kubernetes 不熟,可能會花點時間。

(1) 首先得有個 Kubernetes 叢集,會有個

~/.kube/config

檔案。嘗試執行 kubectl get nodes 看下叢集是否正常。

如果沒有這個

~/.kube/config

檔案,會報錯:

2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known           

(2) 提前建立好使用者和賦權(

RBAC

)

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink           

如果沒有建立使用者,使用預設的使用者去送出,會報錯:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. 

Message: Forbidden!Configured service account doesn't have access. 
Service account may have been revoked. pods is forbidden: 
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".           

(3) 這一步是可選的。預設情況下, JobManager 和 TaskManager 隻會将 log 寫到各自 pod 的 /opt/flink/log 。如果想通過 kubectl logs 看到日志,需要将 log 輸出到控制台。要做如下修改 FLINK_HOME/conf 目錄下的 log4j.properties 檔案。

log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n           

然後啟動 session cluster 的指令行需要帶上參數:

-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"           

(4) 終于可以開始啟動 session cluster了。如下指令是啟動一個每個 TaskManager 是4G記憶體,2個CPU,4個slot 的 session cluster。

bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4           

更多的參數詳見文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用

kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f

就能看到日志了。

如果出現大量的這種日志(目前遇到是雲廠商的LoadBalance liveness探測導緻):

2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exception
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
    at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)           

可以暫時在 log4j.properties 裡面配置上:

log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file           

這個日志太多會導緻 WebUI 上打開 jobmanger log 是空白,因為檔案太大了前端無法顯示。

如果前面第(1)和第(2)步沒有做,會出現各種異常,通過 kubectl logs 就能很友善的看到日志了。

Session cluster 啟動後可以通過 kubectl get pods,svc 來看是否正常。

通過端口轉發來檢視 Web UI:

kubectl port-forward service/kaibo-test 8081           

打開

http://127.0.0.1:8001

就能看到 Flink 的 WebUI 了。

(5) 送出任務

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar           

我們從 Flink WebUI 頁面上可以看到,剛開始啟動時,UI上顯示 Total/Available Task Slots 為0, Task Managers 也是0。随着任務的送出,資源會動态增加。任務停止後,資源就會釋放掉。

在送出任務後,通過 kubectl get pods 能夠看到 Flink 為 TaskManager 配置設定了新的 Pod。

Flink 1.10 Native Kubernetes 原理與實踐

(6) 停止 session cluster

echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true           

也可以手工删除資源:

kubectl delete service/<ClusterID>           

總結

可以看到,Flink 1.10 版本對和 Kubernetes 的內建做了很好的嘗試。期待社群後續的 1.11 版本能對 per-job 提供支援,以及和 Kubernetes 的深度內建,例如基于原生 Kubernetes API 的高可用。最新進展請關注

FLINK-14460