天天看點

Flink流處理之疊代API分析IterativeStreamFeedbackTransformation

Flink在DataStream中也是通過一個特定的可疊代的流(IterativeStream)來建構相關的疊代處理邏輯,這一點跟DataSet提供的可疊代的資料集(IterativeDataSet)的是類似的。

IterativeStream繼承自DataStream,是以DataStream支援的轉換函數,在IterativeStream上同樣可以調用。

IterativeStream的執行個體是通過DataStream的iterate方法建立的˙。iterate方法存在兩個重載形式:一種是無參的,表示不限定最大等待時間;另一種提供一個長整型的maxWaitTimeMillis參數,允許使用者指定等待回報邊的下一個輸入元素的最大時間間隔。而疊代的關閉是通過調用IterativeStream的執行個體方法closeWith來實作的。

每一種資料流(DataStream)都會有與之對應的流轉換(StreamTransformation)。IterativeStream對應的轉換是FeedbackTransformation。

我們來看一下closeWith方法的實作:

解釋一下上文代碼中為什麼要檢查前任StreamTransformation對象的原因。我們結合上一篇的案例中的代碼片段來看:

這裡傳遞給closeWith的是branchedStream篩選出的資料流,而從branchedStream向上是能追溯到可疊代的流iterativeStream的,是以滿足前任追溯到疊代頭的條件。是以這裡需要基于前任向上遊追溯的原因是確定回報流是的源頭是來自疊代頭(進而形成疊代這樣一個閉環),而不是任意的某個流都可以作為回報流。

另外,IterativeStream通過調用withFeedbackType方法還可以改變或者重新指定疊代回報流的類型,進而形成一個跟最初的輸入流組合而成的連接配接疊代流(ConnectedIterativeStreams),這一點也是批進行中的疊代所不具備的。示例代碼如下:

ConnectedIterativeStreams是ConnectedStreams的特例,它表示将疊代最初的輸入與回報邊的輸入進行連接配接所形成的ConnectedStreams。ConnectedIterativeStreams的構造器會要求指定回報流的資料類型資訊(TypeInformation),你可以為其指定Flink所支援的任意類型。ConnectedIterativeStreams對應的轉換是CoFeedbackTransformation,我們在下面會順帶介紹。

當IterativeStream轉變為雙流連接配接而成的ConnectedIterativeStreams,轉換也從FeedbackTransformation轉變為CoFeedbackTransformation,是以ConnectedIterativeStreams也提供了自己的closeWith方法來将CoFeedbackTransformation添加為自己的回報邊。在實作上和IterativeStream是類似的,不再贅述。

疊代流(IterativeStream)對應的轉換是回報轉換(FeedbackTransformation),它表示拓撲中的一個回報點(也即疊代頭)。一個回報點包含一個輸入邊以及若幹個回報邊,且Flink要求每個回報邊的并行度必須跟輸入邊的并行度一緻,這一點在往該轉換中加入回報邊時會進行校驗。

這裡并行度一緻的原因是Flink将采用一種CoLocationGroup來優化疊代任務的子任務執行。當一組作業頂點(JobVertex,一個任務在JobGraph中的表示)被包含在同一個CoLocationGroup中的時候,這些JobVertex在運作時所對應的任務的第i個子任務必須運作在同一個TaskManager的JVM執行個體中。那麼一個分布式的疊代作業,其疊代部分是就是并行度個執行體在并行執行,而每個執行體中的子任務都在位于同一個TaskManager的執行個體中多線程的形式并發地執行,其中還涉及到并發環境下的資料交換,後續會進行分析。

當IterativeStream對象被構造時,FeedbackTransformation的執行個體會被建立并傳遞給DataStream的構造方法:

每一個流轉換對象都要實作獲得其前任轉換對象集合的getTransitivePredecessors方法,FeedbackTransformation對該方法的實作如下:

在上面分析IterativeStream時,我們提過它可以轉換為ConnectedIterativeStreams,ConnectedIterativeStreams對應的CoFeedbackTransformation這裡我們也一并分析一下。CoFeedbackTransformation跟FeedbackTransformation一樣都表示拓撲中的一個回報點。對于CoFeedbackTransformation轉換,它不要求回報邊元素的類型跟上遊輸入端元素的類型一緻。因為上遊流将會成為該轉換的第一個輸入,而回報流将會成為該轉換的第二個輸入。因為兩個流會在此連接配接,是以CoFeedbackTransformation後隻允許跟TwoInputTransformations類型的轉換。

CoFeedbackTransformation同樣對輸入端的并行度和回報邊的并行度有一定的限制,它也要求兩者的并行度必須相等,但是它們的分區政策可以是不一緻的。

Flink在建立ConnectedIterativeStreams流對象時,會用疊代流的初始輸入來作為ConnectedIterativeStreams的第一個輸入流,然後用CoFeedbackTransformation來建構參與連接配接的第二個流對象,這裡可以指定跟疊代流類型不同的feedbackType作為第二個流的類型:

這一篇我們介紹了IterativeStream以及ConnectedIterativeStreams所對應的轉換對象,下一篇我們分析StreamGraph的疊代相關的内容時,将會剖析Flink如何将FeedbackTransformation轉換為算子。

原文釋出時間為:2016-12-04

本文作者:vinoYang

本文來自雲栖社群合作夥伴CSDN部落格,了解相關資訊可以關注CSDN部落格。