天天看點

Flink流處理之疊代案例

目前Flink将疊代的重心集中在批處理上,之前我們談及了批量疊代和增量疊代主要是針對批處理(DataSet)API而言的,并且Flink為批進行中的疊代提供了針對性的優化。但是對于流處理(DataStream),Flink同樣提供了對疊代的支援,這一節我們主要來分析流進行中的疊代,我們将會看到流進行中的疊代相較于批處理有相似之處,但差異也是十分之明顯。

可疊代的流處理程式允許定義“步函數”(step

function)并将其内嵌到一個可疊代的流(IterativeStream)中。因為一個流處理程式可能永不終止,是以不同于批進行中的疊代機制,流進行中無法設定疊代的最大次數。取而代之的是,你可以指定等待回報輸入的最大時間間隔(如果超過該時間間隔沒有回報元素到來,那麼該疊代将會終止)。通過應用split或filter轉換,你可以指定流的哪一部分用于回報給疊代頭,哪一部分分發給下遊。這裡我們以filter作為示例來展示可疊代的流處理程式的API使用模式。

首先,基于輸入流建構IterativeStream,這是一個疊代的起始,通常稱之為疊代頭:

接着,我們指定一系列的轉換操作用于表述在疊代過程中執行的邏輯(這裡簡單以map轉換作為示例),map

API所接受的UDF就是我們上文所說的步函數:

然後,作為疊代我們肯定需要有資料回報給疊代頭進行重複計算,是以我們從疊代過的流中過濾出符合條件的元素組成的部分流,我們稱之為回報流:

将回報流回報給疊代頭就意味着一個疊代的完整邏輯的完成,那麼它就可以“關閉”這個閉合的“環”了。通過調用IterativeStream的closeWith這一執行個體方法可以關閉一個疊代(也可表述為定義了疊代尾)。傳遞給closeWith的資料流将會回報給疊代頭:

另外,一個慣用的模式是過濾出需要繼續向前分發的部分流,這個過濾轉換其實定義的是“終止疊代”的邏輯條件,符合條件的元素将被分發給下遊而不用于進行下一次疊代:

跟分析批進行中的疊代一樣,我們仍然以解決實際問題的案例作為切入點來看看流進行中的疊代跟批進行中的疊代有何不同。

首先描述一下需要解決的問題:産生一個由一系列二進制組(兩個字段都是在一個區間内産生的正整數來作為斐波那契數列的兩個初始值)構成的資料流,然後對該資料流中的二進制組不斷地疊代使其産生斐波那契數列,直到某次産生的值大于給定的門檻值,則停止疊代并輸出疊代次數。

該案例參考自Flink随源碼釋出的疊代示例,此案例問題規模較小并且能夠說明問題。但它示例代碼中的一系列變量稍顯混亂,為了增強程式的表述性,筆者會對其稍作調整。

這個案例如果拆分到對單個元素(二進制組)的角度來看,其執行過程如下圖所示:

n表示疊代次數,在最初的map轉換中初始化為0;m是判定疊代停止的門檻值; 另外,T後面跟的是字段索引,比如T2表示取元組中位置為3的字段。且注意随着疊代T在不斷變化。

上面我們已經對問題的核心過程進行了分析,接下來我們會分步解決這個問題的建構疊代的流處理程式。

首先,我們先通過source函數建立初始的流對象inputStream:

該source函數會生成二進制組序列,二進制組的兩個字段值是随機生成的作為斐波那契數列的初始值:

為了對新計算的斐波那契數列中的值以及累加的疊代次數進行存儲,我們需要将二進制組資料流轉換為五元組資料流,并據此建立疊代對象:

注意上面代碼段中iterate API的參數5000,不是指疊代5000次,而是等待回報輸入的最大時間間隔為5秒。流被認為是無界的,是以無法像批處理疊代那樣指定最大疊代次數。但它允許指定一個最大等待間隔,如果在給定的時間間隔裡沒有元素到來,那麼将會終止疊代。

元組轉換的map函數實作:

上面五元組中,其中索引為0,1這兩個位置的元素,始終都是最初生成的兩個元素不會變化,而後三個字段都會随着疊代而變化。

在疊代流iterativeStream建立完成之後,我們将基于它執行斐波那契數列的步函數并産生斐波那契數列流fibonacciStream:

這裡的fibonacciStream隻是一個代稱,其中的資料并不是真正的斐波那契數列,其實就是上面那個五元組。

其中用于計算斐波那契數列的步函數實作如下:

正如上文所述,後三個字段會産生變化,在計算之前,數列最後一個元素會被保留,也就是f3對應的元素,然後通過f2元素加上f3元素會産生最新值并更新f3元素,而f4則會累加。

随着疊代次數增加,不是整個數列都會被保留,隻有最初的兩個元素和最新的兩個元素會被保留,這裡也沒必要保留整個數列,因為我們不需要完整的數列,我們隻需要對最新的兩個元素進行判斷即可。

上文我們對每個元素計算斐波那契數列的新值并産生了fibonacciStream,但是我們需要對最新的兩個值進行判斷,看它們是否超過了指定的門檻值。超過了門檻值的元組将會被輸出,而沒有超過的則會再次參與疊代。是以這将産生兩個不同的分支,我們也為此建構了分支流:

而對是否超過門檻值的元組進行判斷并分離的實作如下:

在篩選方法select中,我們對不同的分支以不同的常量辨別符進行辨別:ITERATE_FLAG(還要繼續疊代)和OUTPUT_FLAG(直接輸出)。

産生了分支流之後,我們就可以從中檢出不同的流分支做疊代或者輸出處理。對需要再次疊代的,就通過疊代流的closeWith方法回報給疊代頭:

而對于不需要的疊代就直接讓其流向下遊處理,這裡我們隻是簡單得将流“重構”了一下然後直接輸出:

所謂的重構就是将之前的五元組重新縮減為三元組,實作如下:

最終我們将會得到類似如下的輸出:

(7,14,5)

(18,37,3)

(3,46,3)

(23,32,3)

(31,43,2)

(13,45,2)

(37,42,2)

……

前兩個整數是斐波那契數列的兩個初始值,第三個整數表示其需要經曆多少次疊代其斐波那契數列最新的兩個值才會超過門檻值。

最終完整的主幹程式代碼如下:

<code>env.execute("Streaming Iteration Example");</code>

<code></code>