作者:周凱波(寶牛)
千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社群終于在 1.10 版本提供了對 Kubernetes 的原生支援,也就是
Native Kubernetes Integration。不過還隻是Beta版本,預計會在 1.11 版本裡面提供完整的支援。
我們知道,在 Flink 1.9 以及之前的版本裡面,如果要在 Kubernetes 上運作 Flink 任務是需要事先指定好需要的 TaskManager(TM) 的個數以及CPU和記憶體的。這樣的問題是:大多數情況下,你在任務啟動前根本無法精确的預估這個任務需要多少個TM。如果指定的TM多了,會導緻資源浪費;如果指定的TM個數少了,會導緻任務排程不起來。本質原因是在 Kubernetes 上運作的 Flink 任務并沒有直接向 Kubernetes 叢集去申請資源。
Flink 在 1.10 版本完成了
Active Kubernetes Integration
的第一階段,支援了 session clusters。後續的第二階段會提供更完整的支援,如支援 per-job 任務送出,以及基于原生 Kubernetes API 的高可用,支援更多的 Kubernetes 參數如 toleration, label 和 node selector 等。 Active Kubernetes Integration
中的 Active
意味着 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申請新的 Pod,類似于 Flink 中對 Yarn 和 Mesos 的內建所做的那樣。在多租戶環境中,使用者可以利用 Kubernetes 裡面的 namespace 做資源隔離啟動不同的 Flink 叢集。當然,Kubernetes 叢集中的使用者帳号和賦權是需要提前準備好的。 原理

工作原理如下(段首的序号對應圖中箭頭所示的數字):
- Flink 用戶端首先連接配接 Kubernetes API Server,送出 Flink 叢集的資源描述檔案,包括 configmap,job manager service,job manager deployment 和 Owner Reference 。
- Kubernetes Master 就會根據這些資源描述檔案去建立對應的 Kubernetes 實體。以我們最關心的 job manager deployment 為例,Kubernetes 叢集中的某個節點收到請求後,Kubelet 程序會從中央倉庫下載下傳 Flink 鏡像,準備和挂載 volume,然後執行啟動指令。在 flink master 的 pod 啟動後,Dispacher 和 KubernetesResourceManager 也都啟動了。
前面兩步完成後,整個 Flink session cluster 就啟動好了,可以接受送出任務請求了。
- 使用者可以通過 Flink 指令行即 flink client 往這個 session cluster 送出任務。此時 job graph 會在 flink client 端生成,然後和使用者 jar 包一起通過 RestClinet 上傳。
- 一旦 job 送出成功,JobSubmitHandler 收到請求就會送出 job 給 Dispatcher。接着就會生成一個 job master。
- JobMaster 向 KubernetesResourceManager 請求 slots。
- KubernetesResourceManager 從 Kubernetes 叢集配置設定 TaskManager。每個TaskManager都是具有唯一表示的 Pod。KubernetesResourceManager 會為 TaskManager 生成一份新的配置檔案,裡面有 Flink Master 的 service name 作為位址。這樣在 Flink Master failover之後,TaskManager 仍然可以重新連上。
- Kubernetes 叢集配置設定一個新的 Pod 後,在上面啟動 TaskManager。
- TaskManager 啟動後注冊到 SlotManager。
- SlotManager 向 TaskManager 請求 slots。
- TaskManager 提供 slots 給 JobMaster。然後任務就會被配置設定到這個 slots 上運作。
實踐
Flink 的
文檔上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。如果對 Kubernetes 不熟,可能會花點時間。
(1) 首先得有個 Kubernetes 叢集,會有個
~/.kube/config
檔案。嘗試執行 kubectl get nodes 看下叢集是否正常。
如果沒有這個
~/.kube/config
檔案,會報錯:
2020-02-17 22:27:17,253 WARN io.fabric8.kubernetes.client.Config - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Service] with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c] in namespace: [default] failed.
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known
(2) 提前建立好使用者和賦權(
RBAC)
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
如果沒有建立使用者,使用預設的使用者去送出,會報錯:
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes.
Message: Forbidden!Configured service account doesn't have access.
Service account may have been revoked. pods is forbidden:
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".
(3) 這一步是可選的。預設情況下, JobManager 和 TaskManager 隻會将 log 寫到各自 pod 的 /opt/flink/log 。如果想通過 kubectl logs 看到日志,需要将 log 輸出到控制台。要做如下修改 FLINK_HOME/conf 目錄下的 log4j.properties 檔案。
log4j.rootLogger=INFO, file, console
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
然後啟動 session cluster 的指令行需要帶上參數:
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
(4) 終于可以開始啟動 session cluster了。如下指令是啟動一個每個 TaskManager 是4G記憶體,2個CPU,4個slot 的 session cluster。
bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4
更多的參數詳見文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes使用
kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f
就能看到日志了。
如果出現大量的這種日志(目前遇到是雲廠商的LoadBalance liveness探測導緻):
2020-02-17 14:58:56,323 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
可以暫時在 log4j.properties 裡面配置上:
log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file
這個日志太多會導緻 WebUI 上打開 jobmanger log 是空白,因為檔案太大了前端無法顯示。
如果前面第(1)和第(2)步沒有做,會出現各種異常,通過 kubectl logs 就能很友善的看到日志了。
Session cluster 啟動後可以通過 kubectl get pods,svc 來看是否正常。
通過端口轉發來檢視 Web UI:
kubectl port-forward service/kaibo-test 8081
打開
http://127.0.0.1:8001就能看到 Flink 的 WebUI 了。
(5) 送出任務
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar
我們從 Flink WebUI 頁面上可以看到,剛開始啟動時,UI上顯示 Total/Available Task Slots 為0, Task Managers 也是0。随着任務的送出,資源會動态增加。任務停止後,資源就會釋放掉。
在送出任務後,通過 kubectl get pods 能夠看到 Flink 為 TaskManager 配置設定了新的 Pod。
(6) 停止 session cluster
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true
也可以手工删除資源:
kubectl delete service/<ClusterID>
總結
可以看到,Flink 1.10 版本對和 Kubernetes 的內建做了很好的嘗試。期待社群後續的 1.11 版本能對 per-job 提供支援,以及和 Kubernetes 的深度內建,例如基于原生 Kubernetes API 的高可用。最新進展請關注
FLINK-14460