Flink是目前最火的實時流處理架構,這次總結下關于Flink的元件和架構相關的知識,其中包括JobMaster和TaskManager的功能介紹以及作業的送出流程和排程原理,讓我們開發不但知道Flink的怎麼用的,更能清楚的了解Flink底層的運作邏輯,加深我們對Flink的了解。
本文參考官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/concepts/flink-architecture/
一、Flink運作時元件
官方文檔架構圖:
翻譯後的架構圖:
1. 作業管理器JobManager
總結:JobManager是Flink的管理節點,用于管理TaskManager和作業排程器以及檢查點協調器,實際上的管理者
JobManager是Flink實際意義上的管理者,是控制應用程式的核心,每個應用都會被JobManager管理,可以了解為HDFS當中的NameNode節點的作用,當然從官方提供的架構圖來看JobManager也存在單點故障的情況,是以我們可以配置多個JobManager來實作高可用(HA),但是和NameNode一樣,同一時刻隻能有一個對外提供服務,另一個狀态為standby(備用)節點。
JobManager 具有許多與協調 Flink 應用程式的分布式執行有關的職責:它決定何時排程下一個 task(或一組 task)、對完成的 task 或執行失敗做出反應、協調 checkpoint、并且協調從失敗中恢複等等。這個程序由三個不同的元件組成:
1. JobMaster
JobMaster 負責管理單個JobGraph的執行。Flink 叢集中可以同時運作多個作業,每個作業都有自己的 JobMaster。
始終至少有一個 JobManager。高可用(HA)設定中可能有多個 JobManager,其中一個始終是 leader,其他的則是 standby。
2. 資料總管(ResourceManager)
ResourceManager 負責 Flink 叢集中的資源提供、回收、配置設定 - 它管理 task slots,這是 Flink 叢集中資源排程的機關。Flink 為不同的環境和資源提供者(例如 YARN、Kubernetes 和 standalone 部署)實作了對應的 ResourceManager。在 standalone 設定中,ResourceManager 隻能配置設定可用 TaskManager 的 slots,而不能自行啟動新的 TaskManager。
3. 分發器(Dispatcher)
Dispatcher 提供了一個 REST 接口,用來送出 Flink 應用程式執行,并為每個送出的作業啟動一個新的 JobMaster。它還運作 Flink WebUI 用來提供作業執行資訊。
2. 任務管理器TaskManager
總結:TaskManager是Flink的執行節點,用于執行任務的節點,實際上的工作者
TaskManager(也稱為 worker)執行作業流的 task,并且緩存和交換資料流。
必須始終至少有一個 TaskManager。在 TaskManager 中資源排程的最小機關是 task slot。TaskManager 中 task slot 的數量表示并發處理 task 的數量。請注意一個 task slot 中可以執行多個算子(請參考Tasks 和算子鍊)。
二、任務送出流程
流程大緻都是一樣的,隻是在不同的資源管理中某些細節不一緻
1. 獨立部署
- 用戶端通過分發器提供的 REST 接口,将作業送出給JobManager。
- 由分發器啟動 JobMaster,并将作業(包含 JobGraph)送出給 JobMaster。
- JobMaster 将 JobGraph 解析為可執行的 ExecutionGraph,得到所需的資源數量,然後向資料總管請求資源(slots)。
- 資料總管通知 TaskManager 為新的作業提供 slots。
- TaskManager 連接配接到對應的 JobMaster,提供 slots。
- JobMaster 将需要執行的任務分發給 TaskManager。
- TaskManager 執行任務,互相之間可以交換資料。
2. YARN部署
1. 會話模式
此處的資料總管和YARN資料總管是不同的東西
- 用戶端通過 REST 接口,将作業送出給分發器。
- 分發器啟動 JobMaster,并将作業(包含 JobGraph)送出給 JobMaster。
- JobMaster 向資料總管請求資源(slots)。
- 資料總管向 YARN 的資料總管請求 container 資源。
- YARN 啟動新的 TaskManager 容器。
- TaskManager 啟動之後,向 Flink 的資料總管注冊自己的可用任務槽。
- 資料總管通知 TaskManager 為新的作業提供 slots。
- TaskManager 連接配接到對應的 JobMaster,提供 slots。
- JobMaster 将需要執行的任務分發給 TaskManager,執行任務。
2. 單作業模式
- 用戶端将作業送出給 YARN 的資料總管,這一步中會同時将 Flink 的 Jar 包和配置上傳到 HDFS,以便後續啟動 Flink 相關元件的容器。
- YARN 的資料總管配置設定 Container 資源,啟動 Flink JobManager,并将作業送出給JobMaster。這裡省略了 Dispatcher 元件。
- JobMaster 向資料總管請求資源(slots)。
- 資料總管向 YARN 的資料總管請求 container 資源。
- YARN 啟動新的 TaskManager 容器。
- TaskManager 啟動之後,向 Flink 的資料總管注冊自己的可用任務槽。
- 資料總管通知 TaskManager 為新的作業提供 slots。
- TaskManager 連接配接到對應的 JobMaster,提供 slots。
- JobMaster 将需要執行的任務分發給 TaskManager,執行任務。
可見,差別隻在于 JobManager 的啟動方式,以及省去了分發器。當第 2 步作業送出給JobMaster,之後的流程就與會話模式完全一樣了。
3. 應用模式
和單作業模式基本一直,隻是初始送出給 YARN 資料總管的不再是具體的作業,而是整個應用。一個應用中可能包含了多個作業,這些作業都将在 Flink 叢集中啟動各自對應的 JobMaster。
三、相關元件内部原理
1. Tasks和算子鍊
總結:一個算子可以并行執行,形成并行子算子,代表這個算子的并發數,并發數小于等于可用的slot數量,算子鍊是相對于算子間資料傳輸而定的,比如下圖中的source到map兩個算子之間的資料交換是一對一的,是以Flink将此類稱之為算子鍊,而Map算子和keyby算子或map算子和window算子之間資料傳輸并不是一對一的,就不會形成算子鍊。算子鍊就相當于Spark中的窄依賴,算子之間資料傳輸是一對一的
2. Task Slots 和資源
每個 worker(TaskManager)都是一個 JVM 程序,可以在單獨的線程中執行一個或多個 subtask。為了控制一個 TaskManager 中接受多少個 task,就有了所謂的 task slots(至少一個)。意思就是一個TaskManager節點可以運作一個或多個Task Slot。
每個 task slot 代表 TaskManager 中資源的固定子集。例如,具有 3 個 slot 的 TaskManager,會将其托管記憶體 1/3 用于每個 slot。配置設定資源意味着 subtask 不會與其他作業的 subtask 競争托管記憶體,而是具有一定數量的保留托管記憶體。注意此處沒有 CPU 隔離;目前 slot 僅分離 task 的托管記憶體。
通過調整 task slot 的數量,使用者可以定義 subtask 如何互相隔離。每個 TaskManager 有一個 slot,這意味着每個 task 組都在單獨的 JVM 中運作(例如,可以在單獨的容器中啟動)。具有多個 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 連接配接(通過多路複用)和心跳資訊。它們還可以共享資料集和資料結構,進而減少了每個 task 的開銷。
預設情況下,Flink 允許 subtask 共享 slot,即便它們是不同的 task 的 subtask,隻要是來自于同一作業即可。結果就是一個 slot 可以持有整個作業管道。允許 slot 共享有兩個主要優點:
- Flink 叢集所需的 task slot 和作業中使用的最大并行度恰好一樣。無需計算程式總共包含多少個 task(具有不同并行度)。
- 容易獲得更好的資源利用。如果沒有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window) 一樣多的資源。通過 slot 共享,我們示例中的基本并行度從 2 增加到 6,可以充分利用配置設定的資源,同時確定繁重的 subtask 在 TaskManager 之間公平配置設定。
3. 任務槽和并行度的關系
taskslot 是 靜 态 的 概 念 , 是 指 TaskManager 具 有 的 并 發 執 行 能 力 , 可 以 通 過 參 數taskmanager.numberOfTaskSlots 進行配置;而并行度( parallelism)是動态概念,也就是TaskManager 運作程式時實際使用的并發能力,可以通過參數 parallelism.default 進行配置。