作者:沙晟陽
前言
本文主要面向于初次接觸 Flink、或者對 Flink 有了解但是沒有實際操作過的同學。希望幫助大家更順利地上手使用 Flink,并着手相關開發調試工作。
課程内容包括:
- Flink 開發環境的部署和配置
- 運作 Flink 應用(包括:單機 Standalone 模式、多機 Standalone 模式和 Yarn 叢集模式)
一、Flink 開發環境部署和配置
Flink 是一個以 Java 及 Scala 作為開發語言的開源大資料項目,代碼開源在 GitHub 上,并使用 Maven 來編譯和建構項目。對于大部分使用 Flink 的同學來說,Java、Maven 和 Git 這三個工具是必不可少的,另外一個強大的 IDE 有助于我們更快的閱讀代碼、開發新功能以及修複 Bug。因為篇幅所限,我們不會詳述每個工具的安裝細節,但會給出必要的安裝建議。
關于開發測試環境,Mac OS、Linux 系統或者 Windows 都可以。如果使用的是 Windows 10 系統,建議使用 Windows 10 系統的 Linux 子系統來編譯和運作。
工具 | 注釋 |
---|---|
Java | Java 版本至少是Java 8,且最好選用 Java 8u51 及以上版本 |
Maven | 必須使用 Maven 3,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功,但是在 Shade 一些 Dependencies 的過程中有些問題 |
Git | Flink 的代碼倉庫是: https://github.com/apache/flink |
建議選用社群已釋出的穩定分支,比如 Release-1.6 或者 Release-1.7。
1. 編譯 Flink 代碼
在我們配置好之前的幾個工具後,編譯 Flink 就非常簡單了,執行如下指令即可:
mvn clean install -DskipTests
# 或者
mvn clean package -DskipTests
常用編譯參數:
-Dfast 主要是忽略QA plugins和JavaDocs的編譯
-Dhadoop.version=2.6.1 指定hadoop版本
--settings=${maven_file_path} 顯式指定maven settings.xml配置檔案
當成功編譯完成後,能在目前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下檔案(不同的 Flink 代碼分支編譯出的版本号不同,這裡的版本号是 Flink 1.5.1):

其中有三個檔案可以留意一下:
版本 | |
---|---|
flink-1.5.1.tar.gz | Binary 的壓縮包 |
flink-1.5.1-bin/flink-1.5.1 | 解壓後的 Flink binary 目錄 |
flink-dist_2.11-1.5.1.jar | 包含 Flink 核心功能的 jar 包 |
注意:
國内使用者在編譯時可能遇到編譯失敗“Build Failure”(且有 MapR 相關報錯),一般都和 MapR 相關依賴的下載下傳失敗有關,即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專門為 MapR 相關依賴做了代理),還是可能出現下載下傳失敗的情況。問題主要和 MapR 的 Jar 包比較大有關。遇到這些問題時,重試即可。在重試之前,要先根據失敗資訊删除 Maven local repository 中對應的目錄,否則需要等待 Maven 下載下傳的逾時時間才能再次出發下載下傳依賴到本地。
2. 開發環境準備
推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不相容。
如果你需要做一些 Flink 代碼的開發工作,則需要根據 Flink 代碼的 tools/maven/目錄 下的配置檔案來配置 Checkstyle ,因為 Flink 在編譯時會強制代碼風格的檢查,如果代碼風格不符合規範,可能會直接編譯失敗。
二、運作 Flink 應用
1. 基本概念
運作 Flink 應用其實非常簡單,但是在運作 Flink 應用之前,還是有必要了解 Flink 運作時的各個元件,因為這涉及到 Flink 應用的配置問題。圖 1 所示,這是使用者用 DataStream API 寫的一個資料處理程式。可以看到,在一個 DAG 圖中不能被 Chain 在一起的 Operator 會被分隔到不同的 Task 中,也就是說 Task 是 Flink 中資源排程的最小機關。
圖 1 Parallel Dataflows
圖 2 所示,Flink 實際運作時包括兩類程序:
- JobManager(又稱為 JobMaster):協調 Task 的分布式執行,包括排程 Task、協調創 Checkpoint 以及當 Job failover 時協調各個 Task 從 Checkpoint 恢複等。
- TaskManager(又稱為 Worker):執行 Dataflow 中的 Tasks,包括記憶體 Buffer 的配置設定、Data Stream 的傳遞等。
圖 2 Flink Runtime 架構圖
圖 3 所示,Task Slot 是一個 TaskManager 中的最小資源配置設定機關,一個 TaskManager 中有多少個 Task Slot 就意味着能支援多少并發的 Task 處理。需要注意的是,一個 Task Slot 中可以執行多個 Operator,一般這些 Operator 是能被 Chain 在一起處理的。
圖 3 Process
2. 運作環境準備
-
準備 Flink binary
直接從 Flink 官網上下載下傳 Flink binary 的壓縮包
或者從 Flink 源碼編譯而來
- 安裝 Java,并配置 JAVA_HOME 環境變量
3. 單機 Standalone 的方式運作 Flink
(1)基本的啟動流程
最簡單的運作 Flink 應用的方法就是以單機 Standalone 的方式運作。
啟動叢集:
./bin/start-cluster.sh
打開
http://127.0.0.1:8081/就能看到 Flink 的 Web 界面。嘗試送出 Word Count 任務:
./bin/flink run examples/streaming/WordCount.jar
大家可以自行探索 Web 界面中展示的資訊,比如,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計算結果。
我們還可以嘗試通過“--input”參數指定我們自己的本地檔案作為輸入,然後執行:
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
停止叢集:
./bin/stop-cluster.sh
(2)常用配置介紹
- conf / slaves
conf / slaves 用于配置 TaskManager 的部署,預設配置下隻會啟動一個 TaskManager 程序,如果想增加一個 TaskManager 程序的,隻需要檔案中追加一行“localhost”。
也可以直接通過“ ./bin/taskmanager.sh start ”這個指令來追加一個新的 TaskManager:
./bin/taskmanager.sh start|start-foreground|stop|stop-all
- conf/flink-conf.yaml
conf/flink-conf.yaml 用于配置 JM 和 TM 的運作參數,常用配置有:
# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
# the managed memory size for each task manager.
taskmanager.managed.memory.size: 256
Standalone 叢集啟動後,我們可以嘗試分析一下 Flink 相關程序的運作情況。執行 jps 指令,可以看到 Flink 相關的程序主要有兩個,一個是 JobManager 程序,另一個是 TaskManager 程序。我們可以進一步用 ps 指令看看程序的啟動參數中“-Xmx”和“-Xms”的配置。然後我們可以嘗試修改 flink-conf.yaml 中若幹配置,然後重新開機 Standalone 叢集看看發生了什麼變化。
需要補充的是,在 Blink 開源分支上,TaskManager 的記憶體計算上相對于現在的社群版本要更精細化,TaskManager 程序的堆記憶體限制(-Xmx)一般的計算方法是:
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(預設值為128MB)
而最新的 Flink 社群版本 Release-1.7 中 JobManager 和 TaskManager 預設記憶體配置方式為:
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
Flink 社群 Release-1.7 版本中的“taskmanager.heap.size”配置實際上指的不是 Java heap 的記憶體限制,而是 TaskManager 程序總的記憶體限制。我們可以同樣用上述方法檢視 Release-1.7 版本的 Flink binary 啟動的 TaskManager 程序的 -Xmx 配置,會發現實際程序上的 -Xmx 要小于配置的“taskmanager.heap.size”的值,原因在于從中扣除了 Network buffer 用的記憶體,因為 Network buffer 用的記憶體一定是 Direct memory,是以不應該算在堆記憶體限制中。
(3)日志的檢視和配置
JobManager 和 TaskManager 的啟動日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-${user}-standalonesession-${id}-${hostname}”為字首的檔案對應的是 JobManager 的輸出,其中有三個檔案:
- flink-${user}-standalonesession-${id}-${hostname}.log:代碼中的日志輸出
- flink-${user}-standalonesession-${id}-${hostname}.out:程序執行時的stdout輸出
- flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM的GC的日志
Log 目錄中以“flink-${user}-taskexecutor-${id}-${hostname}”為字首的檔案對應的是 TaskManager 的輸出,也包括三個檔案,和 JobManager 的輸出一緻。
日志的配置檔案在 Flink binary 目錄的 conf 子目錄下,其中:
- log4j-cli.properties:用 Flink 指令行時用的 log 配置,比如執行“ flink run”指令
- log4j-yarn-session.properties:用 yarn-session.sh 啟動時指令行執行時用的 log 配置
- log4j.properties:無論是 Standalone 還是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties
這三個“log4j.properties”檔案分别有三個“logback.xml”檔案與之對應,如果想使用 Logback 的同學,之需要把與之對應的“log4j.*properties”檔案删掉即可,對應關系如下:
- log4j-cli.properties -> logback-console.xml
- log4j-yarn-session.properties -> logback-yarn.xml
- log4j.properties -> logback.xml
需要注意的是,“flink-${user}-standalonesession-${id}-${hostname}”和“flink-${user}-taskexecutor-${id}-${hostname}”都帶有“${id}”,“${id}”表示本程序在本機上該角色(JobManager 或 TaskManager)的所有程序中的啟動順序,預設從 0 開始。
(4)進一步探索
嘗試重複執行“./bin/start-cluster.sh”指令,然後看看 Web 頁面(或者執行jps指令),看看會發生什麼?可以嘗試看看啟動腳本,分析一下原因。接着可以重複執行“./bin/stop-cluster.sh”,每次執行完後,看看會發生什麼。
4. 多機部署 Flink Standalone 叢集
部署前要注意的要點:
- 每台機器上配置好 Java 以及 JAVA_HOME 環境變量
- 每台機器上部署的 Flink binary 的目錄要保證是同一個目錄
- 如果需要用 HDFS,需要配置 HADOOP_CONF_DIR 環境變量配置
根據你的叢集資訊修改 conf/masters 和 conf/slaves 配置。
修改 conf/flink-conf.yaml 配置,注意要確定和 Masters 檔案中的位址一緻:
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
確定所有機器的 Flink binary 目錄中 conf 中的配置檔案相同,特别是以下三個:
conf/masters
conf/slaves
conf/flink-conf.yaml
然後啟動 Flink 叢集:
./bin/start-cluster.sh
送出 WordCount 作業:
./bin/flink run examples/streaming/WordCount.jar
上傳 WordCount 的 Input 檔案:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
送出讀寫 HDFS 的 WordCount 作業:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
增加 WordCount 作業的并發度(注意輸出檔案重名會送出失敗):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20
5. Standalone 模式的 HighAvailability(HA)部署和配置
通過圖 2 Flink Runtime 架構圖,我們可以看到 JobManager 是整個系統中最可能導緻系統不可用的角色。如果一個 TaskManager 挂了,在資源足夠的情況下,隻需要把相關 Task 排程到其他空閑 TaskSlot 上,然後 Job 從 Checkpoint 中恢複即可。而如果目前叢集中隻配置了一個 JobManager,則一旦 JobManager 挂了,就必須等待這個 JobManager 重新恢複,如果恢複時間過長,就可能導緻整個 Job 失敗。
是以如果在生産業務使用 Standalone 模式,則需要部署配置 HighAvailability,這樣同時可以有多個 JobManager 待命,進而使得 JobManager 能夠持續服務。
圖 4 Flink JobManager HA 示意圖
- 如果想使用 Flink standalone HA 模式,需要確定基于 Flink Release-1.6.1 及以上版本,因為這裡社群有個 bug 會導緻這個模式下主 JobManager 不能正常工作。
- 接下來的實驗中需要用到 HDFS,是以需要下載下傳帶有 Hadoop 支援的 Flink Binary 包。
(1)(可選)使用 Flink 自帶的腳本部署 ZK
Flink 目前支援基于 Zookeeper 的 HA。如果你的叢集中沒有部署 ZK,Flink 提供了啟動 Zookeeper 叢集的腳本。首先修改配置檔案“conf/zoo.cfg”,根據你要部署的 Zookeeper Server 的機器數來配置“server.X=addressX:peerPort:leaderPort”,其中“X”是一個 Zookeeper Server的唯一 ID,且必須是數字。
# The port at which the clients will connect
clientPort=3181
server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888
然後啟動 Zookeeper:
./bin/start-zookeeper-quorum.sh
jps 指令看到 ZK 程序已經啟動:
停掉 Zookeeper 叢集的指令:
./bin/stop-zookeeper-quorum.sh
(2)修改 Flink Standalone 叢集的配置
修改 conf/masters 檔案,增加一個 JobManager:
$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081
之前修改過的 conf/slaves 檔案保持不變:
$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net
修改 conf/flink-conf.yaml 檔案:
# 配置high-availability mode
high-availability: zookeeper
# 配置zookeeper quorum(hostname和端口需要依據對應zk的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
# (可選)設定zookeeper的root目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# (可選)相當于是這個standalone叢集中建立的zk node的namespace
high-availability.cluster-id: /test_dir/test_standalone2
# JobManager的meta資訊放在dfs,在zk上主要會儲存一個指向dfs路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的這兩個配置都失效了(想想為什麼)。
jobmanager.rpc.address
jobmanager.rpc.port
修改完成後,確定配置同步到其他機器。
啟動 Zookeeper 叢集:
./bin/start-zookeeper-quorum.sh
再啟動 Standalone 叢集(要確定之前的 Standalone 叢集已經停掉):
./bin/start-cluster.sh
分别打開兩個 Master 節點上的 JobManager Web 頁面:
http://z05f06378.sqa.zth.tbsite.net:8081 http://z05c19426.sqa.zth.tbsite.net:8081可以看到兩個頁面最後都轉到了同一個位址上,這個位址就是目前主 JobManager 所在機器,另一個就是 Standby JobManager。以上我們就完成了 Standalone 模式下 HA 的配置。
接下來我們可以測試驗證 HA 的有效性。當我們知道主 JobManager 的機器後,我們可以把主 JobManager 程序 Kill 掉,比如目前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個機器上,就把這個程序殺掉。
接着,再打開這兩個連結:
可以發現後一個連結已經不能展示了,而前一個連結可以展示,說明發生主備切換。
然後我們再重新開機前一次的主 JobManager:
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081
再打開
這個連結,會發現現在這個連結可以轉到
這個頁面上了。說明這個 JobManager 完成了一個 Failover Recovery。
6. 使用 Yarn 模式跑 Flink job
圖 5 Flink Yarn 部署流程圖
相對于 Standalone 模式,Yarn 模式允許 Flink job 的好處有:
- 資源按需使用,提高叢集的資源使用率
- 任務有優先級,根據優先級運作作業
-
基于 Yarn 排程系統,能夠自動化地處理各個角色的 Failover
○ JobManager 程序和 TaskManager 程序都由 Yarn NodeManager 監控
○ 如果 JobManager 程序異常退出,則 Yarn ResourceManager 會重新排程 JobManager 到其他機器
○ 如果 TaskManager 程序異常退出,JobManager 會收到消息并重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager
(1)在 Yarn 上啟動 Long Running 的 Flink 叢集(Session Cluster 模式)
檢視指令參數:
./bin/yarn-session.sh -h
建立一個 Yarn 模式的 Flink 叢集:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
其中用到的參數是:
- -n,--container Number of TaskManagers
- -jm,--jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
- -tm,--taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
- -qu,--queue Specify YARN queue.
- -s,--slots Number of slots per TaskManager
- -t,--ship Ship files in the specified directory (t for transfer)
送出一個 Flink job 到 Flink 叢集:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
這次送出 Flink job,雖然沒有指定對應 Yarn application 的資訊,卻可以送出到對應的 Flink 叢集,原因在于“/tmp/.yarn-properties-${user}”檔案中儲存了上一次建立 Yarn session 的叢集資訊。是以如果同一使用者在同一機器上再次建立一個 Yarn session,則這個檔案會被覆寫掉。
-
如果删掉“/tmp/.yarn-properties-${user}”或者在另一個機器上送出作業能否送出到預期到yarn session中呢?
可以配置了“high-availability.cluster-id”參數,據此從 Zookeeper 上擷取到 JobManager 的位址和端口,進而送出作業。
- 如果 Yarn session 沒有配置 HA,又該如何送出呢?
這個時候就必須要在送出 Flink job 的指令中指明 Yarn 上的 Application ID,通過“-yid”參數傳入:
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
我們可以發現,每次跑完任務不久,TaskManager 就被釋放了,下次在送出任務的時候,TaskManager 又會重新拉起來。如果希望延長空閑 TaskManager 的逾時時間,可以在 conf/flink-conf.yaml 檔案中配置下面這個參數,機關是 milliseconds:
slotmanager.taskmanager-timeout: 30000L # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L
(2)在 Yarn 上運作單個 Flink job(Job Cluster 模式)
如果你隻想運作單個 Flink Job 後就退出,那麼可以用下面這個指令:
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
常用的配置有:
- -yn,--yarncontainer Number of Task Managers
- -yqu,--yarnqueue Specify YARN queue.
- -ys,--yarnslots Number of slots per TaskManager
可以通過 Help 指令檢視 Run 的可用參數:
./bin/flink run -h
我們可以看到,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“--yarn”為字首的參數其實和“./bin/yarn-session.sh -h”指令是一一對應的,語義上也基本一緻。
關于“-n”(在yarn session模式下)、“-yn”在(yarn single job模式下)與“-p”參數的關系:
- “-n”和“-yn”在社群版本中(Release-1.5 ~ Release-1.7)中沒有實際的控制作用,實際的資源是根據“-p”參數來申請的,并且 TM 使用完後就會歸還
- 在 Blink 的開源版本中,“-n”(在 Yarn Session 模式下)的作用就是一開始啟動指定數量的 TaskManager,之後即使 Job 需要更多的 Slot,也不會申請新的 TaskManager
- 在 Blink 的開源版本中,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數量,不設定 TaskManager 的上限。(需要特别注意的是,隻有加上“-yd”參數才能用 Single job 模式(例如:指令“./bin/flink run -yd -m yarn-cluster xxx”)
7. Yarn 模式下的 HighAvailability 配置
首先要確定啟動 Yarn 叢集用的“yarn-site.xml”檔案中的這個配置,這個是 Yarn 叢集級别 AM 重新開機的上限。
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>100</value>
</property>
然後在 conf/flink-conf.yaml 檔案中配置這個 Flink job 的 JobManager 能夠重新開機的次數。
yarn.application-attempts: 10 # 1+ 9 retries
最後再在 conf/flink-conf.yaml 檔案中配置上 ZK 相關配置,這幾個配置的配置方法和 Standalone 的 HA 配置方法基本一緻,如下所示。
# 配置high-availability mode
high-availability: zookeeper
# 配置zookeeper quorum(hostname和端口需要依據對應zk的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
# (可選)設定zookeeper的root目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 删除這個配置
# high-availability.cluster-id: /test_dir/test_standalone2
# JobManager的meta資訊放在dfs,在zk上主要會儲存一個指向dfs路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要特别注意的是:“high-availability.cluster-id”這個配置最好去掉,因為在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的話,會配置成 Yarn 上的 Application ID ,進而可以保證唯一性。