天天看點

《從0到1學習Flink》—— Flink JobManager 高可用性配置

轉載請務必注明原創位址為:http://www.54tianzhisheng.cn/2019/01/13/Flink-JobManager-High-availability/ , 未經允許禁止轉載。

《從0到1學習Flink》—— Flink JobManager 高可用性配置

前言

之前在 《從0到1學習Flink》—— Flink 配置檔案詳解 講過 Flink 的配置,但是後面陸續有人來問我一些配置相關的東西,在加上我現在對 Flink 也更熟悉了些,這裡我就再寫下 Flink JobManager 的配置相關資訊。

在 《從0到1學習Flink》—— Apache Flink 介紹 一文中介紹過了 Flink Job 的運作架構圖:

《從0到1學習Flink》—— Flink JobManager 高可用性配置

JobManager 協調每個 Flink 作業的部署。它負責排程和資源管理。

預設情況下,每個 Flink 叢集都有一個 JobManager 執行個體。這會産生單點故障(SPOF):如果 JobManager 崩潰,則無法送出新作業且運作中的作業也會失敗。

如果我們使用 JobManager 高可用模式,可以避免這個問題。您可以為 standalone 叢集和 YARN 叢集配置高可用模式。

standalone 叢集高可用性

standalone 叢集的 JobManager 高可用性的概念是,任何時候都有一個主 JobManager 和 多個備 JobManagers,以便在主節點失敗時有新的 JobNamager 接管叢集。這樣就保證了沒有單點故障,一旦備 JobManager 接管叢集,作業就可以依舊正常運作。主備 JobManager 執行個體之間沒有明确的差別。每個 JobManager 都可以充當主備節點。

例如,請考慮以下三個 JobManager 執行個體的設定:

《從0到1學習Flink》—— Flink JobManager 高可用性配置

如何配置

要啟用 JobManager 高可用性功能,您必須将高可用性模式設定為 zookeeper,配置 ZooKeeper quorum,将所有 JobManagers 主機及其 Web UI 端口寫入配置檔案。

Flink 利用 ZooKeeper 在所有正在運作的 JobManager 執行個體之間進行分布式協調。ZooKeeper 是獨立于 Flink 的服務,通過 leader 選舉和輕量級一緻性狀态存儲提供高可靠的分布式協調服務。Flink 包含用于 Bootstrap ZooKeeper 安裝的腳本。

他在我們的 Flink 安裝路徑下面 /conf/zoo.cfg 。

《從0到1學習Flink》—— Flink JobManager 高可用性配置

Masters 檔案

要啟動 HA 叢集,請在以下位置配置 Masters 檔案 conf/masters:

localhost:8081
xxx.xxx.xxx.xxx:8081
           

masters 檔案包含啟動 JobManagers 的所有主機以及 Web 使用者界面綁定的端口,上面一行寫一個。

預設情況下,job manager 選一個随機端口作為程序通信端口。您可以通過 high-availability.jobmanager.port 更改此設定。此配置接受單個端口(例如 50010),範圍(50000-50025)或兩者的組合(50010,50011,50020-50025,50050-50075)。

配置檔案 (flink-conf.yaml)

要啟動 HA 叢集,請将以下配置鍵添加到 conf/flink-conf.yaml:

高可用性模式(必需):在 conf/flink-conf.yaml中,必須将高可用性模式設定為 zookeeper,以打開高可用模式。

high-availability: zookeeper
           

ZooKeeper quorum(必需):ZooKeeper quorum 是一組 ZooKeeper 伺服器,它提供分布式協調服務。

high-availability.zookeeper.quorum: ip1:2181 [,...],ip2:2181
           

每個 ip:port 都是一個 ZooKeeper 伺服器的 ip 及其端口,Flink 可以通過指定的位址和端口通路 zookeeper。

《從0到1學習Flink》—— Flink JobManager 高可用性配置

另外就是高可用存儲目錄,JobManager 中繼資料儲存在檔案系統 storageDir 中,在 ZooKeeper 中僅儲存了指向此狀态的指針, 推薦這個目錄是 HDFS, S3, Ceph, nfs 等,該 storageDir 中儲存了 JobManager 恢複狀态需要的所有中繼資料。

high-availability.storageDir: hdfs:///flink/ha/
           

配置 master 檔案和 ZooKeeper 配置後,您可以使用提供的叢集啟動腳本。他們将啟動 HA 叢集。請注意,啟動 Flink HA 叢集前,必須啟動 Zookeeper 叢集,并確定為要啟動的每個 HA 叢集配置單獨的 ZooKeeper 根路徑。

示例

具有 2 個 JobManagers 的 Standalone 叢集:

1、在 conf/flink-conf.yaml 中配置高可用模式和 Zookeeper :

high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
           

2、在 conf/masters 中 配置 masters:

localhost:8081
localhost:8082
           

3、在 conf/zoo.cfg 中配置 Zookeeper 服務:

server.0=localhost:2888:3888
           

4、啟動 ZooKeeper 叢集:

$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
           

5、啟動一個 Flink HA 叢集:

$ bin/start-cluster.sh
Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
Starting jobmanager daemon on host localhost.
Starting jobmanager daemon on host localhost.
Starting taskmanager daemon on host localhost.
           

6、停止 ZooKeeper 和叢集:

$ bin/stop-cluster.sh
Stopping taskmanager daemon (pid: 7647) on localhost.
Stopping jobmanager daemon (pid: 7495) on host localhost.
Stopping jobmanager daemon (pid: 7349) on host localhost.

$ bin/stop-zookeeper-quorum.sh
Stopping zookeeper daemon (pid: 7101) on host localhost.
           

上面的執行腳本如下圖可見:

《從0到1學習Flink》—— Flink JobManager 高可用性配置

YARN 叢集高可用性

當運作高可用的 YARN 叢集時,我們不會運作多個 JobManager 執行個體,而隻會運作一個,該 JobManager 執行個體失敗時,YARN 會将其重新啟動。Yarn 的具體行為取決于您使用的 YARN 版本。

如何配置?

Application Master 最大重試次數 (yarn-site.xml)

在 YARN 配置檔案 yarn-site.xml 中,需要配置 application master 的最大重試次數:

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>
           

目前 YARN 版本的預設值為 2(表示允許單個 JobManager 失敗兩次)。

Application Attempts (flink-conf.yaml)

除了上面可以配置最大重試次數外,你還可以在 flink-conf.yaml 配置如下:

yarn.application-attempts: 10
           

這意味着在如果程式啟動失敗,YARN 會再重試 9 次(9 次重試 + 1 次啟動),如果啟動 10 次作業還失敗,yarn 才會将該任務的狀态置為失敗。如果因為節點硬體故障或重新開機,NodeManager 重新同步等操作,需要 YARN 繼續嘗試啟動應用。這些重新開機嘗試不計入 yarn.application-attempts 個數中。

容器關閉行為

  • YARN 2.3.0 < 版本 < 2.4.0. 如果 application master 程序失敗,則所有的 container 都會重新開機。
  • YARN 2.4.0 < 版本 < 2.6.0. TaskManager container 在 application master 故障期間,會繼續工作。這具有以下優點:作業恢複時間更快,且縮短所有 task manager 啟動時申請資源的時間。
  • YARN 2.6.0 <= version: 将嘗試失敗有效性間隔設定為 Flink 的 Akka 逾時值。嘗試失敗有效性間隔表示隻有在系統在一個間隔期間看到最大應用程式嘗試次數後才會終止應用程式。這避免了持久的工作會耗盡它的應用程式嘗試。

示例:高可用的 YARN Session

1、配置 HA 模式和 Zookeeper 叢集 在 conf/flink-conf.yaml:

high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
yarn.application-attempts: 10
           

2、配置 ZooKeeper 服務 在 conf/zoo.cfg:

server.0=localhost:2888:3888
           

3、啟動 Zookeeper 叢集:

$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
           

4、啟動 HA 叢集:

$ bin/yarn-session.sh -n 2
           

總結

本篇文章再次寫了下 Flink JobManager 的高可用配置,如何在 standalone 叢集和 YARN 叢集中配置高可用。

關注我

微信公衆号:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公衆号了。你可以加我的微信:zhisheng_tian,然後回複關鍵字:Flink 即可無條件擷取到。

《從0到1學習Flink》—— Flink JobManager 高可用性配置

更多私密資料請加入知識星球!

《從0到1學習Flink》—— Flink JobManager 高可用性配置

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以後這個項目的所有代碼都将放在這個倉庫裡,包含了自己學習 flink 的一些 demo 和部落格

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并建構運作簡單程式入門

3、《從0到1學習Flink》—— Flink 配置檔案詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)

9、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學習Flink》—— Flink 寫入資料到 ElasticSearch

12、《從0到1學習Flink》—— Flink 項目如何運作?

13、《從0到1學習Flink》—— Flink 寫入資料到 Kafka

14、《從0到1學習Flink》—— Flink JobManager 高可用性配置

15、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹

16、《從0到1學習Flink》—— Flink 讀取 Kafka 資料批量寫入到 MySQL