天天看點

Apache Flink Client生成StreamGraph概述節點和邊生成StreamGraph小結

上文我們分析送出流程時,<code>RemoteStreamEnvironment</code>類的<code>execute</code>方法的第一步就是生成<code>StreamGraph</code>。

<code>StreamGraph</code>是用于表示流的拓撲結構的資料結構,它包含了生成<code>JobGraph</code>的必要資訊。它的類繼承關系圖如下:

如果你按照<code>StreamGraph</code>的繼承鍊向上追溯,最終會發現它實作了接口<code>FlinkPlan</code>。Flink在這裡效仿的是資料庫的執行SQL是産生執行計劃的機制,<code>FlinkPlan</code>定義在Flink的優化器相關的包中,針對流應用的計劃是<code>StreamingPlan</code>。

針對Batch類的應用的計劃類是OptimizedPlan。Flink會對Batch類的應用進行優化(這點我們後面會分析),而目前針對Streaming類的應用沒有優化措施。

<code>StreamGraph</code>的形象化表示如下圖:

上面的圖是由“節點”和“邊”組成的。節點在Flink中對應的資料結構是<code>StreamNode</code>,而邊在Flink中對應的資料結構是<code>StreamEdge</code>。<code>StreamNode</code>和<code>StreamEdge</code>之間存在着組合的依賴關系,依賴關系可見下圖:

<code>StreamEdge</code>包含了其連接配接的源節點<code>sourceVertex</code>和目的節點<code>targetVertex</code>,而<code>StreamNode</code>中包含了與其連接配接的入邊集合<code>inEdges</code>和出邊集合<code>outEdges</code>。<code>StreamEdge</code>和<code>StreamNode</code>都有唯一的編号進行辨別,但是各自編号的生成規則并不相同。

<code>StreamNode</code>的編号<code>id</code>的生成是通過調用<code>StreamTransformation</code>的靜态方法<code>getNewNodeId</code>獲得的,其實作是一個靜态計數器:

<code>StreamEdge</code>的編号<code>edgeId</code>是字元串類型,其生成的規則為:

它是由多個段連接配接起來的,語義的文字表述如下:

<code>edgeId</code>除了用來實作StreamEdge的hashCode及equals方法之外并沒有其他實際意義。

StreamNode其實是表示operator的資料結構,了解這一點很重要。從Flink開始生成StreamGraph開始,source、sink都是圖中的一個節點都是operator,都通過StreamNode這一資料結構來表示,我們常将它們單獨拎出來講是因為它們是流的的輸入和輸出,但在資料結構層面上它們是一緻的。

<code>StreamNode</code>除了存儲了輸入端和輸出端的<code>StreamEdge</code>集合,還封裝了<code>operator</code>的其他關鍵屬性,基于這不是我們關注的重點,是以不再贅述。

回過頭來我們看<code>JobGraph</code>就不是那麼難了解了。它包含了表述整個流拓撲的所有必要資訊(比如所有的節點集合、所有的<code>source</code>集合、所有的<code>sink</code>集合、虛拟輸出選擇節點、虛拟分區節點)。同時還包含了大量操作這些資訊的方法。

了解了基礎的資料結構之後,我們來分析如何生成<code>JobGraph</code>。定位到<code>getStreamGraph</code>的實作:

它依賴于<code>transformations</code>集合,該集合中存儲着一個<code>Streaming</code>程式中所有的轉換操作對應的<code>StreamTransformation</code>對象。

每當在<code>DataStream</code>對象上調用<code>transform</code>方法或者調用已經被實作了的一些轉換操作(如map、flter等,這些轉換操作在内部也調用了<code>transform</code>方法),這些調用都會被加入到<code>transformations</code>集合中。

StreamTransformation表示建立DataStream的操作,其實每個DataStream底層都對應着一個StreamTransformation。DataStream持有執行環境對象的引用,當調用transform方法時,它會調用執行環境對象的addOperator方法,将特定的StreamTransformation對象加入到transformations集合中去,這就是transformations集合中元素的來源。

到目前為止我們提到了多個名詞,它們之前擁有着強依賴關系,為了避免混淆,我們以flatMap轉換操作為例圖示各種對象之間的建構關系:

在源碼中,其實Flink自身的命名也并不是那麼準确,比如上圖中的SingleOutputStreamOperator其實是一種DataStream,但卻以Operator結尾,讓人匪夷所思。這種情況下,鑒定它們類型的方式可以通過檢視它們的繼承鍊來進行識别。

<code>StreamGraph</code>的生成依賴于生成器<code>StreamGraphGenerator</code>,每調用一次靜态方法<code>generate</code>才會在内部建立一個<code>StreamGraphGenerator</code>的執行個體,一個執行個體對應着一個<code>StreamGraph</code>對象。<code>StreamGraphGenerator</code>調用内部的執行個體方法<code>generateInternal</code>來周遊<code>transformations</code>集合的每個對象:

在<code>transform</code>方法中,它枚舉了Flink中每一種轉換類型,并對目前傳入的轉換類型進行判斷,然後将其分發給特定的轉換方法進行轉換,最終傳回目前<code>StreamGraph</code>對象中跟該轉換有關的節點編号集合。

你可以将整個過程看作是玩拼圖遊戲,每周遊完一個轉換對象,就離建構完整的<code>StreamGraph</code>更近一步。所有類型各異的轉換操作各自持有整個<code>StreamGraph</code>的一部分小圖檔,根據不同的轉換操作類型,它們為<code>StreamGraph</code>提供的“部件”并不完全相同,有的轉換隻建構節點(如<code>SourceTransformation</code>),有的轉換除了建構節點還建構邊(如<code>SinkTransformation</code>),有的隻建構虛拟節點(如<code>PartitionTransformation</code>、<code>SplitTransformation</code>、<code>SelectTransformation</code>)。

關于虛拟節點,這裡需要說明的是并非所有轉換操作都具有實際的實體意義(即實體上對應<code>operator</code>)。有些轉換操作隻具有邏輯概念,例如<code>union</code>,<code>split</code>,<code>select</code>,<code>partition</code>。這些轉換操作不會建構真實的<code>StreamNode</code>對象。比如某個流處理應用對應的轉換樹如下圖:

但在運作時,其生成的執行計劃,這裡也就等同于<code>StreamGraph</code>卻是下圖這種形式:

從圖中可以看到,轉換圖中對應的一些邏輯操作在産生的執行計劃時并不存在,Flink将這些邏輯轉換操作轉換成了虛拟節點,它們的資訊會被綁定到從<code>source</code>到<code>map</code>轉換的這條邊上。

在給<code>StreamGraph</code>建立并添加一個<code>operator</code>時,需要給該<code>operator</code>指定<code>slotSharingGroup</code>,這時需要調用方法<code>determineSlotSharingGroup</code>來獲得SlotSharingGroup的名稱:

當使用者指定了組名,則直接使用使用者指定的名稱。如果使用者沒有指定特定的名稱,則需要結合輸入節點來做決定:第一種情況如果所有的輸入節點都擁有相同的<code>slotSharingGroup</code>名稱,那麼就使用該組名;否則組名将被命名為<code>default</code>。

Flink目前對于流處理的應用是不作優化的,是以其執行計劃就是<code>StreamGraph</code>。Flink提供了一個執行計劃的可視化器,它将用戶端生成的執行計劃以圖形的方式展示出來,就像本節開始我們展示的那幅圖就是可視化器生成的。那麼我們怎麼來檢視我們自己編寫的程式的執行計劃呢?其實很簡單,我們以Flink的flink-examples-streaming包中的<code>SocketTextStreamWordCount</code>為例,來看一下如何生成執行計劃。

我們将<code>SocketTextStreamWordCount</code>最後一行代碼注釋掉:

然後将其替換成下面這句:

這行語句的作用是列印目前這個程式的執行計劃,它将在控制台産生該執行計劃的JSON格式表示:

把上面這段JSON複制到Flink的執行計劃可視化器,點選下方的<code>Draw</code>按鈕,即可生成。

本文我們談論了<code>StreamGraph</code>的資料結構以及<code>StreamGraphGenerator</code>如何生成<code>StreamGraph</code>。鑒于<code>StreamEdge</code>和<code>StreamNode</code>是組成<code>StreamGraph</code>不可或缺的部分,我們還對這兩個資料結構進行了簡單的分析。當然,<code>StreamGraph</code>還有一個關鍵的執行個體方法:<code>getJobGraph</code>,它用于擷取流處理程式的<code>JobGraph</code>(該方法繼承自<code>StreamingPlan</code>)。至于什麼是<code>JobGraph</code>以及如何擷取它,我們将在下文進行讨論。

原文釋出時間為:2016-07-23

本文作者:vinoYang