天天看點

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

作者:沙晟陽

前言

本文主要面向于初次接觸 Flink、或者對 Flink 有了解但是沒有實際操作過的同學。希望幫助大家更順利地上手使用 Flink,并着手相關開發調試工作。

課程内容包括:

  • Flink 開發環境的部署和配置
  • 運作 Flink 應用(包括:單機 Standalone 模式、多機 Standalone 模式和 Yarn 叢集模式)

一、Flink 開發環境部署和配置

Flink 是一個以 Java 及 Scala 作為開發語言的開源大資料項目,代碼開源在 GitHub 上,并使用 Maven 來編譯和建構項目。對于大部分使用 Flink 的同學來說,Java、Maven 和 Git 這三個工具是必不可少的,另外一個強大的 IDE 有助于我們更快的閱讀代碼、開發新功能以及修複 Bug。因為篇幅所限,我們不會詳述每個工具的安裝細節,但會給出必要的安裝建議。

關于開發測試環境,Mac OS、Linux 系統或者 Windows 都可以。如果使用的是 Windows 10 系統,建議使用 Windows 10 系統的 Linux 子系統來編譯和運作。

工具 注釋
Java Java 版本至少是Java 8,且最好選用 Java 8u51 及以上版本
Maven 必須使用 Maven 3,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功,但是在 Shade 一些 Dependencies 的過程中有些問題
Git Flink 的代碼倉庫是: https://github.com/apache/flink

建議選用社群已釋出的穩定分支,比如 Release-1.6 或者 Release-1.7。

1. 編譯 Flink 代碼

在我們配置好之前的幾個工具後,編譯 Flink 就非常簡單了,執行如下指令即可:

mvn clean install -DskipTests
# 或者
mvn clean package -DskipTests            

常用編譯參數:

-Dfast    主要是忽略QA plugins和JavaDocs的編譯
-Dhadoop.version=2.6.1    指定hadoop版本
--settings=${maven_file_path}    顯式指定maven settings.xml配置檔案            

當成功編譯完成後,能在目前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下檔案(不同的 Flink 代碼分支編譯出的版本号不同,這裡的版本号是 Flink 1.5.1):

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

其中有三個檔案可以留意一下:

版本
flink-1.5.1.tar.gz Binary 的壓縮包
flink-1.5.1-bin/flink-1.5.1 解壓後的 Flink binary 目錄
flink-dist_2.11-1.5.1.jar 包含 Flink 核心功能的 jar 包
注意:

國内使用者在編譯時可能遇到編譯失敗“Build Failure”(且有 MapR 相關報錯),一般都和 MapR 相關依賴的下載下傳失敗有關,即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專門為 MapR 相關依賴做了代理),還是可能出現下載下傳失敗的情況。問題主要和 MapR 的 Jar 包比較大有關。遇到這些問題時,重試即可。在重試之前,要先根據失敗資訊删除 Maven local repository 中對應的目錄,否則需要等待 Maven 下載下傳的逾時時間才能再次出發下載下傳依賴到本地。

2. 開發環境準備

推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不相容。

如果你需要做一些 Flink 代碼的開發工作,則需要根據 Flink 代碼的 tools/maven/目錄 下的配置檔案來配置 Checkstyle ,因為 Flink 在編譯時會強制代碼風格的檢查,如果代碼風格不符合規範,可能會直接編譯失敗。

二、運作 Flink 應用

1. 基本概念

運作 Flink 應用其實非常簡單,但是在運作 Flink 應用之前,還是有必要了解 Flink 運作時的各個元件,因為這涉及到 Flink 應用的配置問題。圖 1 所示,這是使用者用 DataStream API 寫的一個資料處理程式。可以看到,在一個 DAG 圖中不能被 Chain 在一起的 Operator 會被分隔到不同的 Task 中,也就是說 Task 是 Flink 中資源排程的最小機關。

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

圖 1 Parallel Dataflows

圖 2 所示,Flink 實際運作時包括兩類程序:

  • JobManager(又稱為 JobMaster):協調 Task 的分布式執行,包括排程 Task、協調創 Checkpoint 以及當 Job failover 時協調各個 Task 從 Checkpoint 恢複等。
  • TaskManager(又稱為 Worker):執行 Dataflow 中的 Tasks,包括記憶體 Buffer 的配置設定、Data Stream 的傳遞等。
Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

圖 2 Flink Runtime 架構圖

圖 3 所示,Task Slot 是一個 TaskManager 中的最小資源配置設定機關,一個 TaskManager 中有多少個 Task Slot 就意味着能支援多少并發的 Task 處理。需要注意的是,一個 Task Slot 中可以執行多個 Operator,一般這些 Operator 是能被 Chain 在一起處理的。

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

圖 3 Process

2. 運作環境準備

  • 準備 Flink binary

    直接從 Flink 官網上下載下傳 Flink binary 的壓縮包

或者從 Flink 源碼編譯而來

  • 安裝 Java,并配置 JAVA_HOME 環境變量

3. 單機 Standalone 的方式運作 Flink

(1)基本的啟動流程

最簡單的運作 Flink 應用的方法就是以單機 Standalone 的方式運作。

啟動叢集:

./bin/start-cluster.sh           

打開

http://127.0.0.1:8081/

就能看到 Flink 的 Web 界面。嘗試送出 Word Count 任務:

./bin/flink run examples/streaming/WordCount.jar           

大家可以自行探索 Web 界面中展示的資訊,比如,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計算結果。

我們還可以嘗試通過“--input”參數指定我們自己的本地檔案作為輸入,然後執行:

./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}           

停止叢集:

./bin/stop-cluster.sh           

(2)常用配置介紹

  • conf / slaves

conf / slaves 用于配置 TaskManager 的部署,預設配置下隻會啟動一個 TaskManager 程序,如果想增加一個 TaskManager 程序的,隻需要檔案中追加一行“localhost”。

也可以直接通過“ ./bin/taskmanager.sh start ”這個指令來追加一個新的 TaskManager:

./bin/taskmanager.sh start|start-foreground|stop|stop-all           
  • conf/flink-conf.yaml

conf/flink-conf.yaml 用于配置 JM 和 TM 的運作參數,常用配置有:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4

# the managed memory size for each task manager.
taskmanager.managed.memory.size: 256           

Standalone 叢集啟動後,我們可以嘗試分析一下 Flink 相關程序的運作情況。執行 jps 指令,可以看到 Flink 相關的程序主要有兩個,一個是 JobManager 程序,另一個是 TaskManager 程序。我們可以進一步用 ps 指令看看程序的啟動參數中“-Xmx”和“-Xms”的配置。然後我們可以嘗試修改 flink-conf.yaml 中若幹配置,然後重新開機 Standalone 叢集看看發生了什麼變化。

需要補充的是,在 Blink 開源分支上,TaskManager 的記憶體計算上相對于現在的社群版本要更精細化,TaskManager 程序的堆記憶體限制(-Xmx)一般的計算方法是:

TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(預設值為128MB)           

而最新的 Flink 社群版本 Release-1.7 中 JobManager 和 TaskManager 預設記憶體配置方式為:

# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m

# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m           

Flink 社群 Release-1.7 版本中的“taskmanager.heap.size”配置實際上指的不是 Java heap 的記憶體限制,而是 TaskManager 程序總的記憶體限制。我們可以同樣用上述方法檢視 Release-1.7 版本的 Flink binary 啟動的 TaskManager 程序的 -Xmx 配置,會發現實際程序上的 -Xmx 要小于配置的“taskmanager.heap.size”的值,原因在于從中扣除了 Network buffer 用的記憶體,因為 Network buffer 用的記憶體一定是 Direct memory,是以不應該算在堆記憶體限制中。

(3)日志的檢視和配置

JobManager 和 TaskManager 的啟動日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-${user}-standalonesession-${id}-${hostname}”為字首的檔案對應的是 JobManager 的輸出,其中有三個檔案:

  • flink-${user}-standalonesession-${id}-${hostname}.log:代碼中的日志輸出
  • flink-${user}-standalonesession-${id}-${hostname}.out:程序執行時的stdout輸出
  • flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM的GC的日志

Log 目錄中以“flink-${user}-taskexecutor-${id}-${hostname}”為字首的檔案對應的是 TaskManager 的輸出,也包括三個檔案,和 JobManager 的輸出一緻。

日志的配置檔案在 Flink binary 目錄的 conf 子目錄下,其中:

  • log4j-cli.properties:用 Flink 指令行時用的 log 配置,比如執行“ flink run”指令
  • log4j-yarn-session.properties:用 yarn-session.sh 啟動時指令行執行時用的 log 配置
  • log4j.properties:無論是 Standalone 還是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties

這三個“log4j.properties”檔案分别有三個“logback.xml”檔案與之對應,如果想使用 Logback 的同學,之需要把與之對應的“log4j.*properties”檔案删掉即可,對應關系如下:

  • log4j-cli.properties -> logback-console.xml
  • log4j-yarn-session.properties -> logback-yarn.xml
  • log4j.properties -> logback.xml

需要注意的是,“flink-${user}-standalonesession-${id}-${hostname}”和“flink-${user}-taskexecutor-${id}-${hostname}”都帶有“${id}”,“${id}”表示本程序在本機上該角色(JobManager 或 TaskManager)的所有程序中的啟動順序,預設從 0 開始。

(4)進一步探索

嘗試重複執行“./bin/start-cluster.sh”指令,然後看看 Web 頁面(或者執行jps指令),看看會發生什麼?可以嘗試看看啟動腳本,分析一下原因。接着可以重複執行“./bin/stop-cluster.sh”,每次執行完後,看看會發生什麼。

4. 多機部署 Flink Standalone 叢集

部署前要注意的要點:

  • 每台機器上配置好 Java 以及 JAVA_HOME 環境變量
  • 每台機器上部署的 Flink binary 的目錄要保證是同一個目錄
  • 如果需要用 HDFS,需要配置 HADOOP_CONF_DIR 環境變量配置

根據你的叢集資訊修改 conf/masters 和 conf/slaves 配置。

修改 conf/flink-conf.yaml 配置,注意要確定和 Masters 檔案中的位址一緻:

jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net           

確定所有機器的 Flink binary 目錄中 conf 中的配置檔案相同,特别是以下三個:

conf/masters
conf/slaves
conf/flink-conf.yaml           

然後啟動 Flink 叢集:

./bin/start-cluster.sh           

送出 WordCount 作業:

./bin/flink run examples/streaming/WordCount.jar           

上傳 WordCount 的 Input 檔案:

hdfs dfs -copyFromLocal story /test_dir/input_dir/story           

送出讀寫 HDFS 的 WordCount 作業:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output           

增加 WordCount 作業的并發度(注意輸出檔案重名會送出失敗):

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20           

5. Standalone 模式的 HighAvailability(HA)部署和配置

通過圖 2 Flink Runtime 架構圖,我們可以看到 JobManager 是整個系統中最可能導緻系統不可用的角色。如果一個 TaskManager 挂了,在資源足夠的情況下,隻需要把相關 Task 排程到其他空閑 TaskSlot 上,然後 Job 從 Checkpoint 中恢複即可。而如果目前叢集中隻配置了一個 JobManager,則一旦 JobManager 挂了,就必須等待這個 JobManager 重新恢複,如果恢複時間過長,就可能導緻整個 Job 失敗。

是以如果在生産業務使用 Standalone 模式,則需要部署配置 HighAvailability,這樣同時可以有多個 JobManager 待命,進而使得 JobManager 能夠持續服務。

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

圖 4 Flink JobManager HA 示意圖

  • 如果想使用 Flink standalone HA 模式,需要確定基于 Flink Release-1.6.1 及以上版本,因為這裡社群有個 bug 會導緻這個模式下主 JobManager 不能正常工作。
  • 接下來的實驗中需要用到 HDFS,是以需要下載下傳帶有 Hadoop 支援的 Flink Binary 包。

(1)(可選)使用 Flink 自帶的腳本部署 ZK

Flink 目前支援基于 Zookeeper 的 HA。如果你的叢集中沒有部署 ZK,Flink 提供了啟動 Zookeeper 叢集的腳本。首先修改配置檔案“conf/zoo.cfg”,根據你要部署的 Zookeeper Server 的機器數來配置“server.X=addressX:peerPort:leaderPort”,其中“X”是一個 Zookeeper Server的唯一 ID,且必須是數字。

# The port at which the clients will connect
clientPort=3181

server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888           

然後啟動 Zookeeper:

./bin/start-zookeeper-quorum.sh           

jps 指令看到 ZK 程序已經啟動:

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

停掉 Zookeeper 叢集的指令:

./bin/stop-zookeeper-quorum.sh           

(2)修改 Flink Standalone 叢集的配置

修改 conf/masters 檔案,增加一個 JobManager:

$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081           

之前修改過的 conf/slaves 檔案保持不變:

$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net           

修改 conf/flink-conf.yaml 檔案:

# 配置high-availability mode
high-availability: zookeeper

# 配置zookeeper quorum(hostname和端口需要依據對應zk的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181

# (可選)設定zookeeper的root目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root

# (可選)相當于是這個standalone叢集中建立的zk node的namespace
high-availability.cluster-id: /test_dir/test_standalone2

# JobManager的meta資訊放在dfs,在zk上主要會儲存一個指向dfs路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/           

需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的這兩個配置都失效了(想想為什麼)。

jobmanager.rpc.address
jobmanager.rpc.port           

修改完成後,確定配置同步到其他機器。

啟動 Zookeeper 叢集:

./bin/start-zookeeper-quorum.sh           

再啟動 Standalone 叢集(要確定之前的 Standalone 叢集已經停掉):

./bin/start-cluster.sh           

分别打開兩個 Master 節點上的 JobManager Web 頁面:

http://z05f06378.sqa.zth.tbsite.net:8081 http://z05c19426.sqa.zth.tbsite.net:8081

可以看到兩個頁面最後都轉到了同一個位址上,這個位址就是目前主 JobManager 所在機器,另一個就是 Standby JobManager。以上我們就完成了 Standalone 模式下 HA 的配置。

接下來我們可以測試驗證 HA 的有效性。當我們知道主 JobManager 的機器後,我們可以把主 JobManager 程序 Kill 掉,比如目前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個機器上,就把這個程序殺掉。

接着,再打開這兩個連結:

可以發現後一個連結已經不能展示了,而前一個連結可以展示,說明發生主備切換。

然後我們再重新開機前一次的主 JobManager:

./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081           

再打開

這個連結,會發現現在這個連結可以轉到

這個頁面上了。說明這個 JobManager 完成了一個 Failover Recovery。

6. 使用 Yarn 模式跑 Flink job

Apache Flink 零基礎入門(二):開發環境搭建和應用的配置、部署及運作

圖 5 Flink Yarn 部署流程圖

相對于 Standalone 模式,Yarn 模式允許 Flink job 的好處有:

  • 資源按需使用,提高叢集的資源使用率
  • 任務有優先級,根據優先級運作作業
  • 基于 Yarn 排程系統,能夠自動化地處理各個角色的 Failover

    ○ JobManager 程序和 TaskManager 程序都由 Yarn NodeManager 監控

○ 如果 JobManager 程序異常退出,則 Yarn ResourceManager 會重新排程 JobManager 到其他機器

○ 如果 TaskManager 程序異常退出,JobManager 會收到消息并重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager

(1)在 Yarn 上啟動 Long Running 的 Flink 叢集(Session Cluster 模式)

檢視指令參數:

./bin/yarn-session.sh -h           

建立一個 Yarn 模式的 Flink 叢集:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m           

其中用到的參數是:

  • -n,--container Number of TaskManagers
  • -jm,--jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
  • -tm,--taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
  • -qu,--queue Specify YARN queue.
  • -s,--slots Number of slots per TaskManager
  • -t,--ship Ship files in the specified directory (t for transfer)

送出一個 Flink job 到 Flink 叢集:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output           

這次送出 Flink job,雖然沒有指定對應 Yarn application 的資訊,卻可以送出到對應的 Flink 叢集,原因在于“/tmp/.yarn-properties-${user}”檔案中儲存了上一次建立 Yarn session 的叢集資訊。是以如果同一使用者在同一機器上再次建立一個 Yarn session,則這個檔案會被覆寫掉。

  • 如果删掉“/tmp/.yarn-properties-${user}”或者在另一個機器上送出作業能否送出到預期到yarn session中呢?

    可以配置了“high-availability.cluster-id”參數,據此從 Zookeeper 上擷取到 JobManager 的位址和端口,進而送出作業。

  • 如果 Yarn session 沒有配置 HA,又該如何送出呢?

這個時候就必須要在送出 Flink job 的指令中指明 Yarn 上的 Application ID,通過“-yid”參數傳入:

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output           

我們可以發現,每次跑完任務不久,TaskManager 就被釋放了,下次在送出任務的時候,TaskManager 又會重新拉起來。如果希望延長空閑 TaskManager 的逾時時間,可以在 conf/flink-conf.yaml 檔案中配置下面這個參數,機關是 milliseconds:

slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L           

(2)在 Yarn 上運作單個 Flink job(Job Cluster 模式)

如果你隻想運作單個 Flink Job 後就退出,那麼可以用下面這個指令:

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output           

常用的配置有:

  • -yn,--yarncontainer Number of Task Managers
  • -yqu,--yarnqueue Specify YARN queue.
  • -ys,--yarnslots Number of slots per TaskManager

可以通過 Help 指令檢視 Run 的可用參數:

./bin/flink run -h           

我們可以看到,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“--yarn”為字首的參數其實和“./bin/yarn-session.sh -h”指令是一一對應的,語義上也基本一緻。

關于“-n”(在yarn session模式下)、“-yn”在(yarn single job模式下)與“-p”參數的關系:

  • “-n”和“-yn”在社群版本中(Release-1.5 ~ Release-1.7)中沒有實際的控制作用,實際的資源是根據“-p”參數來申請的,并且 TM 使用完後就會歸還
  • 在 Blink 的開源版本中,“-n”(在 Yarn Session 模式下)的作用就是一開始啟動指定數量的 TaskManager,之後即使 Job 需要更多的 Slot,也不會申請新的 TaskManager
  • 在 Blink 的開源版本中,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數量,不設定 TaskManager 的上限。(需要特别注意的是,隻有加上“-yd”參數才能用 Single job 模式(例如:指令“./bin/flink run -yd -m yarn-cluster xxx”)

7. Yarn 模式下的 HighAvailability 配置

首先要確定啟動 Yarn 叢集用的“yarn-site.xml”檔案中的這個配置,這個是 Yarn 叢集級别 AM 重新開機的上限。

<property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>100</value>
    </property>           

然後在 conf/flink-conf.yaml 檔案中配置這個 Flink job 的 JobManager 能夠重新開機的次數。

yarn.application-attempts: 10     # 1+ 9 retries           

最後再在 conf/flink-conf.yaml 檔案中配置上 ZK 相關配置,這幾個配置的配置方法和 Standalone 的 HA 配置方法基本一緻,如下所示。

# 配置high-availability mode
high-availability: zookeeper

# 配置zookeeper quorum(hostname和端口需要依據對應zk的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181

# (可選)設定zookeeper的root目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root

# 删除這個配置
# high-availability.cluster-id: /test_dir/test_standalone2

# JobManager的meta資訊放在dfs,在zk上主要會儲存一個指向dfs路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/           

需要特别注意的是:“high-availability.cluster-id”這個配置最好去掉,因為在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的話,會配置成 Yarn 上的 Application ID ,進而可以保證唯一性。