當我們需要分析使用者的一段互動的行為事件時,通常的想法是将使用者的事件流按照“session”來分組。session 是指一段持續活躍的期間,由活躍間隙分隔開。通俗一點說,消息之間的間隔小于逾時門檻值(sessiongap)的,則被配置設定到同一個視窗,間隔大于門檻值的,則被配置設定到不同的視窗。目前開源領域大部分的流計算引擎都有視窗的概念,但是沒有對 session window 的支援,要實作 session window,需要使用者自己去做完大部分事情。而當 flink 1.1.0 版本正式釋出時,flink 将會是開源流計算領域第一個内建支援 session window 的引擎。
但是這種session window的實作是非常弱的,無法應用到實際生産環境中的。因為它無法處理亂序 event time 的消息。 而在即将到來的 flink 1.1.0 版本中,flink 提供了對 session window 的直接支援,使用者可以通過<code>sessionwindows.withgap()</code>來輕松地定義 session widnow,而且能夠處理亂序消息。flink 對 session window 的支援主要借鑒自 google 的 dataflow 。
假設有這麼個場景,使用者點開手機淘寶後會進行一系列的操作(點選、浏覽、搜尋、購買、切換tab等),這些操作以及對應發生的時間都會發送到伺服器上進行使用者行為分析。那麼使用者的操作行為流的樣例可能會長下面這樣:
通過上圖,我們可以很直覺地觀察到,使用者的行為是一段一段的,每一段内的行為都是連續緊湊的,段内行為的關聯度要遠大于段之間行為的關聯度。我們把每一段使用者行為稱之為“session”,段之間的空檔我們稱之為“session gap”。是以,理所當然地,我們應該按照 session window 對使用者的行為流進行切分,并計算每個session的結果。如下圖所示:
為了定義上述的視窗切分規則,我們可以使用 flink 提供的 <code>sessionwindows</code> 這個 widnow assigner api。如果你用過 <code>slidingeventtimewindows</code>、<code>tumlingprocessingtimewindows</code>等,你會對這個很熟悉。
這樣,flink 就會基于元素的時間戳,自動地将元素放到不同的session window中。如果兩個元素的時間戳間隔小于 session gap,則會在同一個session中。如果兩個元素之間的間隔大于session gap,且沒有元素能夠填補上這個gap,那麼它們會被放到不同的session中。
為了實作 session window,我們需要擴充 flink 中的視窗機制,使得能夠支援視窗合并。要了解其原因,我們需要先了解視窗的現狀。在上一篇文章中,我們談到了 flink 中 windowassigner 負責将元素配置設定到哪個/哪些視窗中去,trigger 決定了一個視窗何時能夠被計算或清除。當元素被配置設定到視窗之後,這些視窗是固定的不會改變的,而且視窗之間不會互相作用。
對于session window來說,我們需要視窗變得更靈活。基本的思想是這樣的:<code>sessionwindows</code> assigner 會為每個進入的元素配置設定一個視窗,該視窗以元素的時間戳作為起始點,時間戳加會話逾時時間為結束點,也就是該視窗為<code>[timestamp, timestamp+sessiongap)</code>。比如我們現在到了兩個元素,它們被配置設定到兩個獨立的視窗中,兩個視窗目前不相交,如圖:
當第三個元素進入時,配置設定到的視窗與現有的兩個視窗發生了疊加,情況變成了這樣:
由于我們支援了視窗的合并,<code>windowassigner</code>可以合并這些視窗。它會周遊現有的視窗,并告訴系統哪些視窗需要合并成新的視窗。flink 會将這些視窗進行合并,合并的主要内容有兩部分:
需要合并的視窗的底層狀态的合并(也就是視窗中緩存的資料,或者對于聚合視窗來說是一個聚合值)
需要合并的視窗的trigger的合并(比如對于eventtime來說,會删除舊視窗注冊的定時器,并注冊新視窗的定時器)
總之,結果是三個元素現在在同一個視窗中了:
需要注意的是,對于每一個新進入的元素,都會配置設定一個屬于該元素的視窗,都會檢查并合并現有的視窗。在觸發視窗計算之前,每一次都會檢查該視窗是否可以和其他視窗合并,直到trigger觸發後,會将該視窗從視窗清單中移除。對于 event time 來說,視窗的觸發是要等到大于視窗結束時間的 watermark 到達,當watermark沒有到,視窗會一直緩存着。是以基于這種機制,可以做到對亂序消息的支援。
這裡有一個優化點可以做,因為每一個新進入的元素都會建立屬于該元素的視窗,然後合并。如果新元素連續不斷地進來,并且新元素的視窗一直都是可以和之前的視窗重疊合并的,那麼其實這裡多了很多不必要的建立視窗、合并視窗的操作,我們可以直接将新元素放到那個已存在的視窗,然後擴充該視窗的大小,看起來就像和新元素的視窗合并了一樣。
為了擴充 flink 中的視窗機制,使得能夠支援視窗合并,首先 window assigner 要能合并現有的視窗,flink 增加了一個新的抽象類 <code>mergingwindowassigner</code> 繼承自 <code>windowassigner</code>,這裡面主要多了一個 <code>mergewindows</code> 的方法,用來決定哪些視窗是可以合并的。
所有已經存在的 assigner 都繼承自 <code>windowassigner</code>,隻有新加入的 session window assigner 繼承自 <code>mergingwindowassigner</code>,如:<code>processingtimesessionwindows</code>和<code>eventtimesessionwindows</code>。
另外,trigger 也需要能支援對合并視窗後的響應,是以 trigger 添加了一個新的接口 <code>onmerge(w window, onmergecontext ctx)</code>,用來響應發生視窗合并之後對trigger的相關動作,比如根據合并後的視窗注冊新的 event time 定時器。
ok,接下來我們看下最核心的代碼,也就是對于每個進入的元素的處理,代碼位于<code>windowoperator.processelement</code>方法中,如下所示:
其實這段代碼寫的并不是很clean,并且不是很好了解。在第六行中有用到<code>mergingwindowset</code>,這個類很重要是以我們先介紹它。這是一個用來跟蹤視窗合并的類。比如我們有a、b、c三個視窗需要合并,合并後的視窗為d視窗。這三個視窗在底層都有對應的狀态集合,為了避免代價高昂的狀态替換(建立新狀态是很昂貴的),我們保持其中一個視窗作為原始的狀态視窗,其他幾個視窗的資料合并到該狀态視窗中去,比如随機選擇b作為狀态視窗,那麼a和c視窗中的資料需要合并到b視窗中去。這樣就沒有新狀态産生了,但是我們需要額外維護視窗與狀态視窗之間的映射關系(d->b),這就是<code>mergingwindowset</code>負責的工作。這個映射關系需要在失敗重新開機後能夠恢複,是以<code>mergingwindowset</code>内部也是對該映射關系做了容錯。狀态合并的工作示意圖如下所示:
然後我們來解釋下processelement的代碼,首先根據window assigner為新進入的元素配置設定視窗集合。接着進入第一個條件塊,取出目前的<code>mergingwindowset</code>。對于每個配置設定到的視窗,我們就會将其加入到<code>mergingwindowset</code>中(<code>addwindow</code>方法),由<code>mergingwindowset</code>維護視窗與狀态視窗之間的關系,并在需要視窗合并的時候,合并狀态和trigger。然後根據映射關系,取出結果視窗對應的狀态視窗,根據狀态視窗取出對應的狀态。将新進入的元素資料加入到該狀态中。最後,根據trigger結果來對視窗資料進行處理,對于session window來說,這裡都是不進行任何處理的。真正對視窗處理是由定時器逾時後對完成的視窗調用<code>processtriggerresult</code>。
<a href="http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-7">introduction to flink streaming - part 7 : implementing session windows using custom trigger</a>
[how apache flink enables new streaming applications
<a href="http://www.vldb.org/pvldb/vol8/p1792-akidau.pdf">google dataflow paper</a>
<a href="https://cloud.google.com/dataflow/model/windowing#session-windows">google dataflow document</a>
<a href="https://issues.apache.org/jira/browse/flink-3174">flink-3174: add merging windowassigner</a>