要了解一個系統,一般都是從架構開始。我們關心的問題是:系統部署成功後各個節點都啟動了哪些服務,各個服務之間又是怎麼互動和協調的。下方是 flink 叢集啟動後架構圖。
當 flink 叢集啟動後,首先會啟動一個 jobmanger 和一個或多個的 taskmanager。由 client 送出任務給 jobmanager,jobmanager 再排程任務到各個 taskmanager 去執行,然後 taskmanager 将心跳和統計資訊彙報給 jobmanager。taskmanager 之間以流的形式進行資料的傳輸。上述三者均為獨立的 jvm 程序。
client 為送出 job 的用戶端,可以是運作在任何機器上(與 jobmanager 環境連通即可)。送出 job 後,client 可以結束程序(streaming的任務),也可以不結束并等待結果傳回。
jobmanager 主要負責排程 job 并協調 task 做 checkpoint,職責上很像 storm 的 nimbus。從 client 處接收到 job 和 jar 包等資源後,會生成優化後的執行計劃,并以 task 的單元排程到各個 taskmanager 去執行。
taskmanager 在啟動的時候就設定好了槽位數(slot),每個 slot 能啟動一個 task,task 為線程。從 jobmanager 處接收需要部署的 task,部署啟動後,與自己的上遊建立 netty 連接配接,接收資料并處理。
可以看到 flink 的任務排程是多線程模型,并且不同job/task混合在一個 taskmanager 程序中。雖然這種方式可以有效提高 cpu 使用率,但是個人不太喜歡這種設計,因為不僅缺乏資源隔離機制,同時也不友善調試。類似 storm 的程序模型,一個jvm 中隻跑該 job 的 tasks 實際應用中更為合理。
本文所示例子為 flink-1.0.x 版本
我們使用 flink 自帶的 examples 包中的 <code>sockettextstreamwordcount</code>,這是一個從 socket 流中統計單詞出現次數的例子。
首先,使用 netcat 啟動本地伺服器:
然後送出 flink 程式
在netcat端輸入單詞并監控 taskmanager 的輸出可以看到單詞統計的結果。
<code>sockettextstreamwordcount</code> 的具體代碼如下:
但這并不是最終在 flink 中運作的執行圖,隻是一個表示拓撲節點關系的計劃圖,在 flink 中對應了 steramgraph。另外,送出拓撲後(并發度設為2)還能在 ui 中看到另一張執行計劃圖,如下所示,該圖對應了 flink 中的 jobgraph。
看起來有點亂,怎麼有這麼多不一樣的圖。實際上,還有更多的圖。flink 中的執行圖可以分成四層:streamgraph -> jobgraph -> executiongraph -> 實體執行圖。
streamgraph:是根據使用者通過 stream api 編寫的代碼生成的最初的圖。用來表示程式的拓撲結構。
jobgraph:streamgraph經過優化後生成了 jobgraph,送出給 jobmanager 的資料結構。主要的優化為,将多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。
executiongraph:jobmanager 根據 jobgraph 生成的分布式執行圖,是排程層最核心的資料結構。
實體執行圖:jobmanager 根據 executiongraph 對 job 進行排程後,在各個taskmanager 上部署 task 後形成的“圖”,并不是一個具體的資料結構。
例如上文中的2個并發度(source為1個并發度)的 <code>sockettextstreamwordcount</code> 四層執行圖的演變過程如下圖所示(點選檢視大圖):
這裡對一些名詞進行簡單的解釋。
streamgraph:根據使用者通過 stream api 編寫的代碼生成的最初的圖。
streamnode:用來代表 operator 的類,并具有所有相關的屬性,如并發度、入邊和出邊等。
streamedge:表示連接配接兩個streamnode的邊。
jobgraph:streamgraph經過優化後生成了 jobgraph,送出給 jobmanager 的資料結構。
jobvertex:經過優化後符合條件的多個streamnode可能會chain在一起生成一個jobvertex,即一個jobvertex包含一個或多個operator,jobvertex的輸入是jobedge,輸出是intermediatedataset。
intermediatedataset:表示jobvertex的輸出,即經過operator處理産生的資料集。producer是jobvertex,consumer是jobedge。
jobedge:代表了job graph中的一條資料傳輸通道。source 是 intermediatedataset,target 是 jobvertex。即資料通過jobedge由intermediatedataset傳遞給目标jobvertex。
executionjobvertex:和jobgraph中的jobvertex一一對應。每一個executionjobvertex都有和并發度一樣多的 executionvertex。
executionvertex:表示executionjobvertex的其中一個并發子任務,輸入是executionedge,輸出是intermediateresultpartition。
intermediateresult:和jobgraph中的intermediatedataset一一對應。每一個intermediateresult有與下遊executionjobvertex相同并發數的intermediateresultpartition。
intermediateresultpartition:表示executionvertex的一個輸出分區,producer是executionvertex,consumer是若幹個executionedge。
executionedge:表示executionvertex的輸入,source是intermediateresultpartition,target是executionvertex。source和target都隻能是一個。
execution:是執行一個 executionvertex 的一次嘗試。當發生故障或者資料需要重算的情況下 executionvertex 可能會有多個 executionattemptid。一個 execution 通過 executionattemptid 來唯一辨別。jm和tm之間關于 task 的部署和 task status 的更新都是通過 executionattemptid 來确定消息接受者。
task:execution被排程後在配置設定的 taskmanager 中啟動對應的 task。task 包裹了具有使用者執行邏輯的 operator。
resultpartition:代表由一個task的生成的資料,和executiongraph中的intermediateresultpartition一一對應。
resultsubpartition:是resultpartition的一個子分區。每個resultpartition包含多個resultsubpartition,其數目要由下遊消費 task 數和 distributionpattern 來決定。
inputgate:代表task的輸入封裝,和jobgraph中jobedge一一對應。每個inputgate消費了一個或多個的resultpartition。
inputchannel:每個inputgate會包含一個以上的inputchannel,和executiongraph中的executionedge一一對應,也和resultsubpartition一對一地相連,即一個inputchannel接收一個resultsubpartition的輸出。
那麼 flink 為什麼要設計這4張圖呢,其目的是什麼呢?spark 中也有多張圖,資料依賴圖以及實體執行的dag。其目的都是一樣的,就是解耦,每張圖各司其職,每張圖對應了 job 不同的階段,更友善做該階段的事情。我們給出更完整的 flink graph 的層次圖。
首先我們看到,jobgraph 之上除了 streamgraph 還有 optimizedplan。optimizedplan 是由 batch api 轉換而來的。streamgraph 是由 stream api 轉換而來的。為什麼 api 不直接轉換成 jobgraph?因為,batch 和 stream 的圖結構和優化方法有很大的差別,比如 batch 有很多執行前的預分析用來優化圖的執行,而這種優化并不普适于 stream,是以通過 optimizedplan 來做 batch 的優化會更友善和清晰,也不會影響 stream。jobgraph 的責任就是統一 batch 和 stream 的圖,用來描述清楚一個拓撲圖的結構,并且做了 chaining 的優化,chaining 是普适于 batch 和 stream 的,是以在這一層做掉。executiongraph 的責任是友善排程和各個 tasks 狀态的監控和跟蹤,是以 executiongraph 是并行化的 jobgraph。而“實體執行圖”就是最終分布式在各個機器上運作着的tasks了。是以可以看到,這種解耦方式極大地友善了我們在各個層所做的工作,各個層之間是互相隔離的。
後續的文章,将會詳細介紹 flink 是如何生成這些執行圖的。由于我目前關注 flink 的流處理功能,是以主要有以下内容:
<a href="http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/">如何生成 streamgraph</a>
<a href="http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/">如何生成 jobgraph</a>
如何生成 executiongraph
如何進行排程(如何生成實體執行圖)