本文基于flink 1.13.x yarn 3.1.x編寫并驗證
1 前提
- 安裝yarn叢集并啟動
- 配置HADOOP_CLASSPATH環境變量
注意:預設情況下,所有必需的Hadoop配置檔案都是通過HADOOP_CLASSPATH環境變量從類路徑加載的。
2 Flink在YARN上支援的部署方式
對于生産使用,建議Per-job or Application Mode部署Flink應用程式,因為這些模式為應用程式提供了更好的隔離。
詳見參見: flink on yarn
2.1 Application Mode
Application Mode将在YARN上啟動一個Flink叢集,其中應用程式jar的main()方法将在YARN中的JobManager上執行。
應用程式一完成,叢集就會關閉。
可以通過yarn application -kill 或取消Flink作業手動停止叢集。
#送出任務
./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar -d
#列出叢集上正在運作的作業,列出jobId、jobName
./bin/flink list -t yarn-application -Dyarn.application.id=application_1626944535001_1008
#取消任務: jobId
#請注意,取消應用程式叢集上的作業将停止該叢集。
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_1626944535001_1008 825e0311eef66f586d0977a7c20a432d
為了釋放Application Mode的全部潛力,請考慮與yarn.provided.lib.dirs配置項一起使用
将flink的依賴jar、應用程式jar上傳到叢集中所有節點都可以通路的位置。
下述操作将使作業送出變得更加輕量級,因為所需的Flink jar和應用程式jar将由指定的遠端位置提取,而不是由客戶機發送到叢集。
#将flink lib及應用的jar上傳到hdfs
hdfs dfs -mkdir -p hdfs://pci01:8020/jars/flink
hdfs dfs -copyFromLocal lib/*.jar hdfs://pci01:8020/jars/flink/
hdfs dfs -mkdir -p hdfs://pci01:8020/jars/apps
hdfs dfs -copyFromLocal ./examples/streaming/TopSpeedWindowing.jar hdfs://pci01:8020/jars/apps/
hdfs dfs -copyFromLocal ./examples/batch/WordCount.jar hdfs://pci01:8020/jars/apps/
#運作TopSpeedWindowing
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/" hdfs://pci01:8020/jars/apps/TopSpeedWindowing.jar
#運作wordcount示例
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/" hdfs://pci01:8020/jars/apps/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt --output hdfs://pci01:8020/tmp/wordcount-result10.txt
hdfs dfs -cat hdfs://pci01:8020/tmp/wordcount-result10.tx
2.2 Per-Job Cluster Mode
Per-Job Cluster Mode将在YARN上啟動一個Flink叢集,然後在本地運作提供的應用程式jar,最後将JobGraph送出給YARN上的JobManager。如果傳遞–detached參數,用戶端将在送出被接受後停止。
一個任務會對應一個Job,每送出一個作業會根據自身的情況,都會單獨向yarn申請資源,直到作業執行完成,一個作業的失敗與否并不會影響下一個作業的正常送出和運作。獨享Dispatcher和ResourceManager,按需接受資源申請;适合規模大長時間運作的作業。
一旦作業停止,Flink叢集就會停止。
#送出任務
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
#列出叢集上正在運作的作業, 列出jobId、jobName
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_1626944535001_1015
#取消任務: jobId
#請注意,取消應用程式叢集上的作業将停止該叢集。
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1626944535001_1015 3386aec1fb6fba288c13a17b5cb6730b
2.3 Session Mode
Session-Cluster模式需要先啟動叢集,然後再送出作業,接着會向yarn申請一塊空間後,資源永遠保持不變。如果資源滿了,下一個作業就無法送出,隻能等到yarn中的其中一個作業執行完成後,釋放了資源,下個作業才會正常送出。所有作業共享Dispatcher和ResourceManager;共享資源;适合規模小執行時間短的作業。
Session-Cluster的資源在啟動叢集時就定義完成,後續所有作業的送出都共享該資源,作業可能會互相影響,是以比較适合小規模短時間運作的作業
Session Mode将在/tmp/.yarn-properties-目錄下建立一個隐藏的YARN properties檔案,它将在送出作業時用于叢集發現,并在指令行界面中顯示,有如下兩種操作模式:
- attached mode (default): yarn-session.sh 用戶端将Flink叢集送出給YARN,但是用戶端繼續運作,跟蹤叢集的狀态。如果叢集失敗,用戶端将顯示錯誤。如果用戶端被終止,它也會發出關閉叢集的信号。
- detached mode (
or-d
): 将Flink叢集送出給YARN,用戶端傳回。需要通過以下指令停止--detached
#優雅停止 echo "stop" | ./bin/yarn-session.sh -id application_1626944535001_0984 #停止 yarn application -kill application_1626944535001_0984
除了通過conf/flink-conf.yaml檔案傳遞配置,也可以使用-Dkey=value參數在送出時将任何配置傳遞./bin/yarn-session.sh用戶端
2.3.1 在YARN上啟動一個長期的Flink叢集
-s參數表示每個Task Manager上可用的處理槽(processing slot)數量。建議把槽數量設定成每個機器處理器的個數。 一旦會話被啟動,就可以使用./bin/flink工具送出任務到叢集上。
cd flink/flink-1.13.2
./bin/yarn-session.sh -jm 2048 -tm 4096 -s 16 -d
#優雅停止
echo "stop" | ./bin/yarn-session.sh -id application_1626944535001_0984
#停止
yarn application -kill application_1626944535001_0984
# 檢視可以獲得的指令
echo "help" | ./bin/yarn-session.sh -id application_1626944535001_0984
# 檢視運作的application
yarn top
2.3.1.1 檢視啟動的flink叢集
在 Yarn Cluster頁面 http://172.25..:8088/cluster/apps/RUNNING,檢視 name為Flink session cluster的application, 點選Tracking UI中的ApplicationMaster連結,跳轉到Flink dashboard頁面
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiNx8FesU2cfdGLwczX0xiRGZkRGZ0Xy9GbvNGLwIzXlpXazxSPJhlWywWbjVnVHNWQClGVF5UMR9Fd4VGdsATNfd3bkFGazxycykFaKdkYzZUbapXNXlleSdVY2pESa9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLkR2NxETZ0UmMxMzN2kTN5U2M2QzM5IWN3UGOxUTOhBzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
在yarn-session.sh成功運作後,日志顯示如下:
在日志的末尾顯示了 flink dashboard位址 及 applicationId
2.3.1.2 檢視配置資訊
more /tmp/.yarn-properties-root
顯示内容如下:
#Generated YARN properties file
#Fri Sep 10 11:43:19 CST 2021
dynamicPropertiesString=
applicationID=application_1626944535001_1021
2.3.2 yarn-session.sh腳本參數
參數 | 名稱 | 解釋 |
---|---|---|
-at,–applicationType | Set a custom application type for the application on YARN | |
-D <property=value> | use value for given property | |
-d,–detached | If present, runs the job in detached mode | |
-h,–help | Help for the Yarn session CLI. | |
-id,–applicationId | Attach to running YARN session | |
-j,–jar | Path to Flink jar file | |
-jm,–jobManagerMemory | Memory for JobManager Container with optional unit (default: MB) | |
-m,–jobmanager | Set to yarn-cluster to use YARN execution mode. | |
-nl,–nodeLabel | Specify YARN node label for the YARN application | |
-nm,–name | Set a custom name for the application on YARN | |
-q,–query | Display available YARN resources (memory, cores) | |
-qu,–queue | Specify YARN queue. | |
-s,–slots | Number of slots per TaskManager | |
-t,–ship | Ship files in the specified directory (t for transfer) | |
-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB) | ||
-yd,–yarndetached | If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) | |
-z,–zookeeperNamespace | Namespace to create the Zookeeper sub-paths for high availability mode |
2.3.3 送出任務
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
./bin/flink run ./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt --output hdfs://pci01:8020/tmp/wordcount-result20.txt
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_1626944535001_1021 \
./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt \
--output hdfs://pci01:8020/tmp/wordcount-result21.txt
3 flink on yarn配置
flink on yarn配置
以下配置參數是由Flink on YARN管理的,因為它們可能會在運作時被架構覆寫:
- jobmanager.rpc.address: 通過Flink on YARN上動态設定為JobManager容器的位址
- io.tmp.dirs:如果沒有設定,Flink将設定YARN定義的臨時目錄
- high-availability.cluster-id : 自動生成的ID,用于區分HA服務中的多個叢集
如果需要将額外的Hadoop配置檔案傳遞給Flink,可以通過HADOOP_CONF_DIR環境變量來實作,該變量接受一個包含Hadoop配置檔案的目錄名。預設情況下,所有必需的Hadoop配置檔案都是通過HADOOP_CLASSPATH環境變量從類路徑加載的。
4 Resource Allocation Behavior 資源配置行為
如果一個運作在YARN上的JobManager不能使用現有資源運作所有送出的作業,那麼它将請求額外的taskManager。特别是在Session Mode下運作時,如果需要,JobManager會在送出額外的作業時配置設定額外的taskManager。未使用的taskManager在逾時後再次釋放。
YARN實作将尊重JobManager和TaskManager程序的記憶體配置。預設情況下,報告的VCores數量等于每個TaskManager配置的槽位數量。yarn.containers.vcores允許使用自定義值覆寫vcores的數量。為了讓這個參數生效,您應該在YARN叢集中啟用CPU scheduling 。
失敗的容器(包括JobManager)将被YARN替換。JobManager容器重新開機的最大次數是通過 yarn.application-attempts[預設是1]配置的。當所有的嘗試都耗盡時,YARN應用程式将失敗。
5 User jars & Classpath
預設情況下,Flink在運作單個作業時将使用者jar包含到系統類路徑【system classpath】中。這種行為可以通過yarn.per-job-cluster.include-user-jar 參數進行控制。
當将此設定為DISABLED時,表示使用者jar被排除在系統類路徑之外。
使用者jar在類路徑中的位置可以通過将參數設定為以下參數之一來控制:
-
: (default) Adds the jar to the system classpath based on the lexicographic order.ORDER
-
: Adds the jar to the beginning of the system classpath.FIRST
-
: Adds the jar to the end of the system classpath.LAST
6 三種模式的差別
詳見 deployment
主要差別點如下:
- 叢集生命周期和資源隔離保證
- 應用程式的main()方法是在client上執行還是在cluster上執行。
- 在session mode中,如果其中一個作業行為不當或關閉了一個TaskManager,那麼在該TaskManager上運作的所有作業都将受到該失敗的影響
應用程式的main方法包括的過程如下:
- 在本地下載下傳應用程式的依賴項,
- 執行main()來提取Flink的運作時可以了解的應用程式的表示(即JobGraph),
- 并将依賴項和JobGraph發送到叢集
如果main在client執行,則client會成為一個沉重的資源消耗者,因為它可能需要大量的網絡帶寬來下載下傳依賴項并将二進制檔案發送到叢集,并需要CPU周期來執行main()。當client 伺服器在使用者之間共享時,這個問題會更加明顯。
Application Mode 與 Per-Job Mode的差別
- Application Mode的main代碼在cluster中執行,Per-Job Mode在client中執行
- 與Per-Job模式相比,Application模式允許送出由多個作業Job組成的應用程式。