flink 認為 batch 是 streaming 的一個特例,是以 flink 底層引擎是一個流式引擎,在上面實作了流處理和批處理。而視窗(window)就是從 streaming 到 batch 的一個橋梁。flink 提供了非常完善的視窗機制,這是我認為的 flink 最大的亮點之一(其他的亮點包括消息亂序處理,和 checkpoint 機制)。本文我們将介紹流式進行中的視窗概念,介紹 flink 内建的一些視窗和 window api,最後讨論下視窗在底層是如何實作的。
在流處理應用中,資料是連續不斷的,是以我們不可能等到所有資料都到了才開始處理。當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘内有多少使用者點選了我們的網頁。在這種情況下,我們必須定義一個視窗,用來收集最近一分鐘内的資料,并對這個視窗内的資料進行計算。
視窗可以是時間驅動的(time window,例如:每30秒鐘),也可以是資料驅動的(count window,例如:每一百個元素)。一種經典的視窗分類可以分成:翻滾視窗(tumbling window,無重疊),滾動視窗(sliding window,有重疊),和會話視窗(session window,活動間隙)。
我們舉個具體的場景來形象地了解不同視窗的概念。假設,淘寶網會記錄每個使用者每次購買的商品個數,我們要做的是統計不同視窗中使用者購買商品的總數。下圖給出了幾種經典的視窗切分概述圖:
上圖中,raw data stream 代表使用者的購買行為流,圈中的數字代表該使用者本次購買的商品個數,事件是按時間分布的,是以可以看出事件之間是有time gap的。flink 提供了上圖中所有的視窗類型,下面我們會逐一進行介紹。
tumbling time window
如上圖,我們需要統計每一分鐘中使用者購買的商品的總數,需要将使用者的行為事件按每一分鐘進行切分,這種切分被成為翻滾時間視窗(tumbling time window)。翻滾視窗能将資料流切分成不重疊的視窗,每一個事件隻能屬于一個視窗。通過使用 datastream api,我們可以這樣實作:
sliding time window
但是對于某些應用,它們需要的視窗是不間斷的,需要平滑地進行視窗聚合。比如,我們可以每30秒計算一次最近一分鐘使用者購買的商品總數。這種視窗我們稱為滑動時間視窗(sliding time window)。在滑窗中,一個元素可以對應多個視窗。通過使用 datastream api,我們可以這樣實作:
count window 是根據元素個數對資料流進行分組的。
tumbling count window
當我們想要每100個使用者購買行為事件統計購買總數,那麼每當視窗中填滿100個元素了,就會對視窗進行計算,這種視窗我們稱之為翻滾計數視窗(tumbling count window),上圖所示視窗大小為3個。通過使用 datastream api,我們可以這樣實作:
sliding count window
當然count window 也支援 sliding window,雖在上圖中未描述出來,但和sliding time window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例如下。
在這種使用者互動事件流中,我們首先想到的是将事件聚合到會話視窗中(一段使用者持續活躍的周期),由非活躍的間隙分隔開。如上圖所示,就是需要計算每個使用者在活躍期間總共購買的商品數量,如果使用者30秒沒有活動則視為會話斷開(假設raw data stream是單個使用者的購買行為流)。session window 的示例代碼如下:
一般而言,window 是在無限的流上定義了一個有限的元素集合。這個集合可以是基于時間的,元素個數的,時間和個數結合的,會話間隙的,或者是自定義的。flink 的 datastream api 提供了簡潔的算子來滿足常用的視窗操作,同時提供了通用的視窗機制來允許使用者自己定義視窗配置設定邏輯。下面我們會對 flink 視窗相關的 api 進行剖析。
得益于 flink window api 松耦合設計,我們可以非常靈活地定義符合特定業務的視窗。flink 中定義一個視窗主要需要以下三個元件。
window assigner:用來決定某個元素被配置設定到哪個/哪些視窗中去。
trigger:觸發器。決定了一個視窗何時能夠被計算或清除,每個視窗都會擁有一個自己的trigger。
evictor:可以譯為“驅逐者”。在trigger觸發之後,在視窗被處理之前,evictor(如果有evictor的話)會用來剔除視窗中不需要的元素,相當于一個filter。
上述三個元件的不同實作的不同組合,可以定義出非常複雜的視窗。flink 中内置的視窗也都是基于這三個元件構成的,當然内置視窗有時候無法解決使用者特殊的需求,是以 flink 也暴露了這些視窗機制的内部接口供使用者實作自定義的視窗。下面我們将基于這三者探讨視窗的實作機制。
下圖描述了 flink 的視窗機制以及各元件之間是如何互相工作的。
每一個視窗都擁有一個屬于自己的 trigger,trigger上會有定時器,用來決定一個視窗何時能夠被計算或清除。每當有元素加入到該視窗,或者之前注冊的定時器逾時了,那麼trigger都會被調用。trigger的傳回結果可以是 continue(不做任何操作),fire(處理視窗資料),purge(移除視窗和視窗中的資料),或者 fire + purge。一個trigger的調用結果隻是fire的話,那麼會計算視窗并保留視窗原樣,也就是說視窗中的資料仍然保留不變,等待下次trigger fire的時候再次執行計算。一個視窗可以被重複計算多次知道它被 purge 了。在purge之前,視窗會一直占用着記憶體。
當trigger fire了,視窗中的元素集合就會交給<code>evictor</code>(如果指定了的話)。evictor 主要用來周遊視窗中的元素清單,并決定最先進入視窗的多少個元素需要被移除。剩餘的元素會交給使用者指定的函數進行視窗的計算。如果沒有 evictor 的話,視窗中的所有元素會一起交給函數進行計算。
計算函數收到了視窗的元素(可能經過了 evictor 的過濾),并計算出視窗的結果值,并發送給下遊。視窗的結果值可以是一個也可以是多個。datastream api 上可以接收不同類型的計算函數,包括預定義的<code>sum()</code>,<code>min()</code>,<code>max()</code>,還有 <code>reducefunction</code>,<code>foldfunction</code>,還有<code>windowfunction</code>。windowfunction 是最通用的計算函數,其他的預定義的函數基本都是基于該函數實作的。
flink 對于一些聚合類的視窗計算(如sum,min)做了優化,因為聚合類的計算不需要将視窗中的所有資料都儲存下來,隻需要儲存一個result值就可以了。每個進入視窗的元素都會執行一次聚合函數并修改result值。這樣可以大大降低記憶體的消耗并提升性能。但是如果使用者定義了 evictor,則不會啟用對聚合視窗的優化,因為 evictor 需要周遊視窗中的所有元素,必須要将視窗中所有元素都存下來。
上述的三個元件構成了 flink 的視窗機制。為了更清楚地描述視窗機制,以及解開一些疑惑(比如 purge 和 evictor 的差別和用途),我們将一步步地解釋 flink 内置的一些視窗(time window,count window,session window)是如何實作的。
count window 是使用三元件的典範,我們可以在 <code>keyedstream</code> 上建立 count window,其源碼如下所示:
第一個函數是申請翻滾計數視窗,參數為視窗大小。第二個函數是申請滑動計數視窗,參數分别為視窗大小和滑動大小。它們都是基于 <code>globalwindows</code> 這個 windowassigner 來建立的視窗,該assigner會将所有元素都配置設定到同一個global window中,所有<code>globalwindows</code>的傳回值一直是 <code>globalwindow</code> 單例。基本上自定義的視窗都會基于該assigner實作。
翻滾計數視窗并不帶evictor,隻注冊了一個trigger。該trigger是帶purge功能的 counttrigger。也就是說每當視窗中的元素數量達到了 window-size,trigger就會傳回fire+purge,視窗就會執行計算并清空視窗中的所有元素,再接着儲備新的元素。進而實作了tumbling的視窗之間無重疊。
滑動計數視窗的各視窗之間是有重疊的,但我們用的 globalwindows assinger 從始至終隻有一個視窗,不像 sliding time assigner 可以同時存在多個視窗。是以trigger結果不能帶purge,也就是說計算完視窗後視窗中的資料要保留下來(供下個滑窗使用)。另外,trigger的間隔是slide-size,evictor的保留的元素個數是window-size。也就是說,每個滑動間隔就觸發一次視窗計算,并保留下最新進入視窗的window-size個元素,剔除舊元素。
假設有一個滑動計數視窗,每2個元素計算一次最近4個元素的總和,那麼視窗工作示意圖如下所示:
圖中所示的各個視窗邏輯上是不同的視窗,但在實體上是同一個視窗。該滑動計數視窗,trigger的觸發條件是元素個數達到2個(每進入2個元素就會觸發一次),evictor保留的元素個數是4個,每次計算完視窗總和後會保留剩餘的元素。是以第一次觸發trigger是當元素5進入,第三次觸發trigger是當元素2進入,并驅逐5和2,計算剩餘的4個元素的總和(22)并發送出去,保留下2,4,9,7元素供下個邏輯視窗使用。
同樣的,我們也可以在 <code>keyedstream</code> 上申請 time window,其源碼如下所示:
在方法體内部會根據目前環境注冊的時間類型,使用不同的windowassigner建立window。可以看到,eventtime和ingesttime都使用了<code>xxxeventtimewindows</code>這個assigner,因為eventtime和ingesttime在底層的實作上隻是在source處為record打時間戳的實作不同,在window operator中的處理邏輯是一樣的。
這裡我們主要分析sliding process time window,如下是相關源碼:
首先,<code>slidingprocessingtimewindows</code>會對每個進入視窗的元素根據系統時間配置設定到<code>(size / slide)</code>個不同的視窗,并會在每個視窗上根據視窗結束時間注冊一個定時器(相同視窗隻會注冊一份),當定時器逾時時意味着該視窗完成了,這時會回調對應視窗的trigger的<code>onprocessingtime</code>方法,傳回fire_and_purge,也就是會執行視窗計算并清空視窗。整個過程示意圖如下:
如上圖所示橫軸代表時間戳(為簡化問題,時間戳從0開始),第一條record會被配置設定到[-5,5)和[0,10)兩個視窗中,當系統時間到5時,就會計算[-5,5)視窗中的資料,并将結果發送出去,最後清空視窗中的資料,釋放該視窗資源。
<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#time-and-windows">flink concepts</a>
[introducing stream windows in apache flink
<a href="https://cwiki.apache.org/confluence/display/flink/streaming+window+join+rework">streaming window join rework</a>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageid=60624830">window semantics (and implementation)</a>
<a href="http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-6">introduction to flink streaming - part 6 : anatomy of window api</a>
<a href="http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-5">introduction to flink streaming - part 5 : window api in flink</a>