作者:邱從賢
1. Window & Time 介紹
Apache Flink(以下簡稱 Flink) 是一個天然支援無限流資料處理的分布式計算架構,在 Flink 中 Window 可以将無限流切分成有限流,是處理有限流的核心元件,現在 Flink 中 Window 可以是時間驅動的(Time Window),也可以是資料驅動的(Count Window)。
下面的代碼是在 Flink 中使用 Window 的兩個示例

2. Window API 使用
從第一部分我們已經知道 Window 的一些基本概念,以及相關 API,下面我們以一個實際例子來看看怎麼使用 Window 相關的 API。
代碼來自 flink-examples:
上面的例子中我們首先會對每條資料進行時間抽取,然後進行 keyby,接着依次調用 window(),evictor(), trigger() 以及 maxBy()。下面我們重點來看 window(), evictor() 和 trigger() 這幾個方法。
2.1 WindowAssigner, Evictor 以及 Trigger
Window 方法接收的輸入是一個WindowAssigner, WindowAssigner 負責将每條輸入的資料分發到正确的 Window 中(一條資料可能同時分發到多個 Window 中),Flink 提供了幾種通用的 WindowAssigner:tumbling window(視窗間的元素無重複),sliding window(視窗間的元素可能重複),session window 以及 global window。如果需要自己定制資料分發政策,則可以實作一個 class,繼承自 WindowAssigner。
Tumbling Window
Sliding Window
Session Window
Global Window
Evictor 主要用于做一些資料的自定義操作,可以在執行使用者代碼之前,也可以在執行使用者代碼之後,更詳細的描述可以參考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 兩個方法。Flink 提供了如下三種通用的 evictor:
- CountEvictor 保留指定數量的元素
- DeltaEvictor 通過執行使用者給定的 DeltaFunction 以及預設的 threshold,判斷是否删除一個元素。
- TimeEvictor設定一個門檻值 interval,删除所有不再 max_ts - interval 範圍内的元素,其中 max_ts 是視窗内時間戳的最大值。
Evictor 是可選的方法,如果使用者不選擇,則預設沒有。
Trigger 用來判斷一個視窗是否需要被觸發,每個 WindowAssigner 都自帶一個預設的 Trigger,如果預設的 Trigger 不能滿足你的需求,則可以自定義一個類,繼承自 Trigger 即可,我們較長的描述下 Trigger 的接口以及含義:
- onElement() 每次往 window 增加一個元素的時候都會觸發
- onEventTime() 當 event-time timer 被觸發的時候會調用
- onProcessingTime() 當 processing-time timer 被觸發的時候會調用
- onMerge() 對兩個 trigger 的 state 進行 merge 操作
- clear() window 銷毀的時候被調用
上面的接口中前三個會傳回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:
- CONTINUE 不做任何事情
- FIRE 觸發 window
- PURGE 清空整個 window 的元素并銷毀視窗
- FIRE_AND_PURGE 觸發視窗,然後銷毀視窗
2.2 Time & Watermark
了解完上面的内容後,對于時間驅動的視窗,我們還有兩個概念需要澄清:Time 和 Watermark。
我們知道在分布式環境中 Time 是一個很重要的概念,在 Flink 中 Time 可以分為三種 Event-Time,Processing-Time 以及 Ingestion-Time,三者的關系我們可以從下圖中得知:
Event Time、Ingestion Time、Processing Time
Event-Time 表示事件發生的時間,Processing-Time 則表示處理消息的時間(牆上時間),Ingestion-Time 表示進入到系統的時間。
在 Flink 中我們可以通過下面的方式進行 Time 類型的設定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 設定使用 ProcessingTime
了解了 Time 之後,我們還需要知道 Watermark 相關的概念。
我們可以考慮一個這樣的例子:某 App 會記錄使用者的所有點選行為,并回傳日志(在網絡不好的情況下,先儲存在本地,延後回傳)。A 使用者在 11:02 對 App 進行操作,B 使用者在 11:03 操作了 App,但是 A 使用者的網絡不太穩定,回傳日志延遲了,導緻我們在服務端先接受到 B 使用者 11:03 的消息,然後再接受到 A 使用者 11:02 的消息,消息亂序了。
那我們怎麼保證基于 event-time 的視窗在銷毀的時候,已經處理完了所有的資料呢?這就是 watermark 的功能所在。watermark 會攜帶一個單調遞增的時間戳 t,watermark(t) 表示所有時間戳不大于 t 的資料都已經到來了,未來小于等于 t 的資料不會再來,是以可以放心地觸發和銷毀視窗了。下圖中給了一個亂序資料流中的 Watermark 例子
2.3 遲到的資料
上面的 Watermark 讓我們能夠應對亂序的資料,但是真實世界中我們沒法得到一個完美的 Watermark 數值 — 要麼沒法擷取到,要麼耗費太大,是以實際工作中我們會使用近似 watermark — 生成 watermark(t) 之後,還有較小的機率接受到時間戳 t 之前的資料,在 Flink 中将這些資料定義為 “late elements”, 同樣我們可以在 Window 中指定是允許延遲的最大時間(預設為 0),可以使用下面的代碼進行設定
設定
allowedLateness
之後,遲來的資料同樣可以觸發視窗,進行輸出,利用 Flink 的 side output 機制,我們可以擷取到這些遲到的資料,使用方式如下:
需要注意的是,設定了 allowedLateness 之後,遲到的資料也可能觸發視窗,對于 Session window 來說,可能會對視窗進行合并,産生預期外的行為。
3. Window 内部實作
在讨論 Window 内部實作的時候,我們再通過下圖回顧一下 Window 的生命周期
每條資料過來之後,會由 WindowAssigner 配置設定到對應的 Window,當 Window 被觸發之後,會交給 Evictor(如果沒有設定 Evictor 則跳過),然後處理 UserFunction。其中 WindowAssigner,Trigger,Evictor 我們都在上面讨論過,而 UserFunction 則是使用者編寫的代碼。
整個流程還有一個問題需要讨論:Window 中的狀态存儲。我們知道 Flink 是支援 Exactly Once 處理語義的,那麼 Window 中的狀态存儲和普通的狀态存儲又有什麼不一樣的地方呢?
首先給出具體的答案:從接口上可以認為沒有差別,但是每個 Window 會屬于不同的 namespace,而非 Window 場景下,則都屬于 VoidNamespace ,最終由 State/Checkpoint 來保證資料的 Exactly Once 語義,下面我們從 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator 摘取一段代碼進行闡述
從上面我們可以知道,Window 中的的元素同樣是通過 State 進行維護,然後由 Checkpoint 機制保證 Exactly Once 語義。
至此,Time、Window 相關的所有内容都已經講解完畢,主要包括為什麼要有 Window; Window 中的三個核心元件:WindowAssigner、Trigger 和 Evictor;Window 中怎麼處理亂序資料,亂序資料是否允許延遲,以及怎麼處理遲到的資料;最後我們梳理了整個 Window 的資料流程,以及 Window 中怎麼保證 Exactly Once 語義。