天天看點

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
簡介:

本文根據 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大資料平台研發負責人張俊老師分享。主要内容如下: 1. 整體思路與學習路徑 2. 應用場景與程式設計模型 3. 工作流程與實作機制

作者 | 張俊(OPPO大資料平台研發負責人)

整理 | 祝尚(Flink 社群志願者)

校對 | 鄒志業(Flink 社群志願者)

摘要:本文根據 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大資料平台研發負責人張俊老師分享。主要内容如下:

  1. 整體思路與學習路徑
  2. 應用場景與程式設計模型
  3. 工作流程與實作機制

Tips:點選「下方連結」可檢視更多數倉系列直播視訊~

數倉系列直播:

https://ververica.cn/developers/flink-training-course-data-warehouse/

整體思路與學習路徑

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

當我們碰到一項新的技術時,我們應該怎樣去學習并應用它呢?在我個人看來,有這樣一個學習的路徑,應該把它拆成應用和實作兩塊。首先應該從它的應用入手,然後再深入它的實作。

應用主要分為三個部分,首先應該了解它的應用場景,比如視窗的一些使用場景。然後,進一步地我們去了解它的程式設計接口,最後再深入了解它的一些抽象概念。因為一個架構或一項技術,肯定有它的程式設計接口和抽象概念來組成它的程式設計模型。我們可以通過檢視文檔的方式來熟悉它的應用。在對應用這三個部分有了初步的了解後,我們就可以通過閱讀代碼的方式去了解它的一些實作了。

實作部分也分三個階段,首先從工作流程開始,可以通過 API 層面不斷的下鑽來了解它的工作流程。接下來是它整體的設計模式,通常對一些架構來說,如果能建構一個比較成熟的生态,一定是在設計模式上有一些獨特的地方,使其有一個比較好的擴充性。最後是它的資料結構和算法,因為為了能夠處理海量資料并達到高性能,它的資料結構和算法一定有獨到之處。我們可以做些深入了解。

以上大概是我們學習的一個路徑。從實作的角度可以反哺到應用上來,通常在應用當中,剛接觸某個概念的時候會有一些疑惑。當我們對實作有一些了解之後,應用中的這些疑惑就會迎刃而解。

為什麼要關心實作

舉個例子:

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

看了這個例子我們可能會有些疑惑:

  • ReduceFunction 為什麼不用計算每個 key 的聚合值?
  • 當 key 基數很大時,如何有效地觸發每個 key 視窗計算?
  • 視窗計算的中間結果如何存儲,何時被清理?
  • 視窗計算如何容忍 late data ?

當你了解了實作部分再回來看應用這部分,可能就有種醍醐灌頂的感覺。

應用場景與程式設計模型

實時數倉的典型架構

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ 第一種最簡單架構

,ODS 層的 Kafka 資料經過 Flink 的 ETL 處理後寫入 DW 層的 Kafka,再通過 Flink 聚合寫入 ADS 層的 MySQL 中,做這樣一個實時報表展現。

缺點

:由于 MySQL 存儲資料有限,是以聚合的時間粒度不能太細,次元組合不能太多。

■ 第二種架構

相對于第一種引入了 OLAP 引擎,同時也不用 Flink 來做聚合,通過 Druid 的 Rollup 來做聚合。

缺點

:因為 Druid 是一個存儲和查詢引擎,不是計算引擎。當資料量巨大時,比如每天上百億、千億的資料量,會加劇 Druid 的導入壓力。

■ 第三種架構

在第二種基礎上,采用 Flink 來做聚合計算寫入 Kafka,最終寫入 Druid。

缺點

:當視窗粒度比較長時,結果輸出會有延遲。

■ 第四種架構在第三種基礎上,結合了 Flink 聚合和 Druid Rollup。Flink 可以做輕度的聚合,Druid 做 Rollup 的彙總。好處是 Druid 可以實時看到 Flink 的聚合結果。

Window 應用場景

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ 聚合統計

:從 Kafka 讀取資料,根據不同的次元做1分鐘或5分鐘的聚合計算,然後結果寫入 MySQL 或 Druid 中。

■ 記錄合并

:對多個 Kafka 資料源在一定的視窗範圍内做合并,結果寫入 ES。例如:使用者的一些行為資料,針對每個使用者,可以對其行為做一定的合并,減少寫入下遊的資料量,降低 ES 的寫入壓力。

■ 雙流 join

:針對雙流 join 的場景,如果全量 join 的話,成本開銷會非常大。是以就要考慮基于視窗來做 join。

Window 抽象概念

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ TimestampAssigner:

時間戳配置設定器,假如我們使用的是 EventTime 時間語義,就需要通過 TimestampAssigner 來告訴Flink 架構,元素的哪個字段是事件時間,用于後面的視窗計算。

■ KeySelector

:Key 選擇器,用來告訴 Flink 架構做聚合的次元有哪些。

■ WindowAssigner

:視窗配置設定器,用來确定哪些資料被配置設定到哪些視窗。

■ State

:狀态,用來存儲視窗内的元素,如果有 AggregateFunction,則存儲的是增量聚合的中間結果。

■ AggregateFunction(可選)

:增量聚合函數,主要用來做視窗的增量計算,減輕視窗内 State 的存儲壓力。

■ Trigger

:觸發器,用來确定何時觸發視窗的計算。

■ Evictor(可選)

:驅逐器,用于在視窗函數計算之前(後)對滿足驅逐條件的資料做過濾。

■ WindowFunction

:視窗函數,用來對視窗内的資料做計算。

■ Collector

:收集器,用來将視窗的計算結果發送到下遊。

上圖中紅色部分都是可以自定義的子產品,通過自定義這些子產品的組合,我們可以實作進階的視窗應用。同時 Flink 也提供了一些内置的實作,可以用來做一些簡單應用。

Window 程式設計接口

stream .assignTimestampsAndWatermarks(…) <- TimestampAssigner .keyBy(...) <- KeySelector .window(...) <- WindowAssigner [.trigger(...)] <- Trigger [.evictor(...)] <- Evictor .reduce/aggregate/process() <- Aggregate/Window function

首先我們先指定時間戳和 Watermark 如何生成;然後選擇需要聚合的次元的 Key;再選擇一個視窗和選擇用什麼樣的觸發器來觸發視窗計算,以及選擇驅逐器做什麼樣的過濾;最後确定視窗應該做什麼樣計算。

下面是一個示例:

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

接下來我們詳細看下每個子產品。

■ Window Assigner
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

總結一下主要有3類視窗:

  • Time Window
  • Count Window
  • Custom Window
■ Window Trigger

Trigger 是一個比較重要的概念,用來确定視窗什麼時候觸發計算。

Flink 内置了一些 Trigger 如下圖:

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ Trigger 示例
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

假如我們定義一個5分鐘的基于 EventTime 的滾動視窗,定義一個每2分觸發計算的 Trigger,有4條資料事件時間分别是20:01、20:02、20:03、20:04,對應的值分别是1、2、3、2,我們要對值做 Sum 操作。

初始時,State 和 Result 中的值都為0。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

當第一條資料在20:01進入視窗時,State 的值為1,此時還沒有到達 Trigger 的觸發時間。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

第二條資料在20:02進入視窗,State 中的值為1+2=3,此時達到2分鐘滿足 Trigger 的觸發條件,是以 Result 輸出結果為3。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

第三條資料在20:03進入視窗,State 中的值為3+3 = 6,此時未達到 Trigger 觸發條件,沒有結果輸出。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

第四條資料在20:04進入視窗,State中的值更新為6+2=8,此時又到了2分鐘達到了 Trigger 觸發時間,是以輸出結果為8。如果我們把結果輸出到支援 update 的存儲,比如 MySQL,那麼結果值就由之前的3更新成了8。

■ 問題:如果 Result 隻能 append?
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

如果 Result 不支援 update 操作,隻能 append 的話,則會輸出2條記錄,在此基礎上再做計算處理就會引起錯誤。

這樣就需要 PurgingTrigger 來處理上面的問題。

■ PurgingTrigger 的應用
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

和上面的示例一樣,唯一的不同是在 ContinuousEventTimeTrigger 外面包裝了一個 PurgingTrigger,其作用是在 ContinuousEventTimeTrigger 觸發視窗計算之後将視窗的 State 中的資料清除。

再看下流程:

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

前兩條資料先後于20:01和20:02進入視窗,此時 State 中的值更新為3,同時到了Trigger的觸發時間,輸出結果為3。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

由于 PurgingTrigger 的作用,State 中的資料會被清除。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

當後兩條資料進入視窗之後,State 重新從0開始累計并更新為5,輸出結果為5。

由于結果輸出是 append 模式,會輸出3和5兩條資料,然後再做 Sum 也能得到正确的結果。

上面就是 PurgingTrigger 的一個簡單的示例,它還支援很多有趣的玩法。

■ DeltaTrigger 的應用

有這樣一個車輛區間測試的需求,車輛每分鐘上報目前位置與車速,每行進10公裡,計算區間内最高車速。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

首先需要考慮的是如何來劃分視窗,它不是一個時間的視窗,也不是一個基于數量的視窗。用傳統的視窗實作比較困難,這種情況下我們考慮使用 DeltaTrigger 來實作。

下面是簡單的代碼實作:

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

如何提取時間戳和生成水印,以及選擇聚合次元就不贅述了。這個場景不是傳統意義上的時間視窗或數量視窗,可以建立一個 GlobalWindow,所有資料都在一個視窗中,我們通過定義一個 DeltaTrigger,并設定一個門檻值,這裡是10000(米)。每個元素和上次觸發計算的元素比較是否達到設定的門檻值,這裡比較的是每個元素上報的位置,如果達到了10000(米),那麼目前元素和上一個觸發計算的元素之間的所有元素落在同一個視窗裡計算,然後可以通過 Max 聚合計算出最大的車速。

■ 思考點

上面這個例子中我們通過 GlobalWindow 和 DeltaTrigger 來實作了自定義的 Window Assigner 的功能。對于一些複雜的視窗,我們還可以自定義 WindowAssigner,但實作起來不一定簡單,倒不如利用 GlobalWindow 和自定義 Trigger 來達到同樣的效果。

下面這個是 Flink 内置的 CountWindow 的實作,也是基于 GlobalWindow 和 Trigger 來實作的。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ Window Evictor

Flink 内置了一些 Evictor 的實作。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ TimeEvictor 的應用

基于上面的區間測速的場景,每行進10公裡,計算區間内最近15分鐘最高車速。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

實作上隻是在前面基礎上增加了 Evictor 的使用,過濾掉視窗最後15分鐘之前的資料。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
■ Window Function

Flink 内置的 WindowFunction 有兩種類型,第一種是 AggregateFunction,它是進階别的抽象,主要用來做增量聚合,每來一條元素都做一次聚合,這樣狀态裡隻需要存最新的聚合值。

  • 優點:增量聚合,實作簡單。
  • 缺點:輸出隻有一個聚合值,使用場景比較局限。
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

第二種是 ProcessWindowFunction,它是低級别的抽象用來做全量聚合,每來一條元素都存在狀态裡面,隻有當視窗觸發計算時才會調用這個函數。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
  • 優點:可以擷取到視窗内所有資料的疊代器,實作起來比較靈活;可以擷取到聚合的 Key 以及可以從上下文 Context 中擷取視窗的相關資訊。
  • 缺點:需要存儲視窗内的全量資料,State 的壓力較大。

同時我們可以把這兩種方式結合起來使用,通過 AggregateFunction 做增量聚合,減少中間狀态的壓力。通過 ProcessWindowFunction 來輸出我們想要的資訊,比如聚合的 Key 以及視窗的資訊。

工作流程和實作機制

上一節我們介紹了視窗的一些抽象的概念,包括它的程式設計接口,通過一些簡單的示例介紹了每個抽象概念的的用法。

這一節我們深入的研究以下視窗底層是怎麼實作的。

WindowOperator 工作流程

首先看下 WindowOperator 的工作流程,代碼做了一些簡化,隻保留了核心步驟。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

主要包括以下8個步驟:

  1. 擷取 element 歸屬的 windows
  2. 擷取 element 對應的 key
  3. 如果 late data,跳過
  4. 将 element 存入 window state
  5. 判斷 element 是否觸發 trigger
  6. 擷取 window state,注入 window function
  7. 清除 window state
  8. 注冊 timer,到視窗結束時間清理 window

Window State

前面提到的增量聚合計算和全量聚合計算,這兩個場景所應用的 State 是不一樣的。

如果是全量聚合,元素會添加到 ListState 當中,當觸發視窗計算時,再把 ListState 中所有元素傳遞給視窗函數。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

如果是增量計算,使用的是 AggregatingState,每條元素進來會觸發 AggregateTransformation 的計算。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

看下 AggregateTransformation 的實作,它會調用我們定義的 AgregateFunction 中的 createAccumulator 方法和 add 方法并将 add 的結果傳回,是以 State 中存儲的就是 accumulator 的值,是以比較輕量級。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

Window Function

在觸發視窗計算時會将視窗中的狀态傳遞給 emitWindowContents 方法。這裡會調用我們定義的視窗函數中的 process 方法,将目前的 Key、Window、上下文 Context、視窗的内容作為參數傳給它。在此之前和之後會分别調用 evictBefore 和evictAfter 方法把一些元素過濾掉。最終會調用 windowState 的 clear 方法,再把過濾之後的記錄存到 windowState 中去。進而達到 evictor 過濾元素的效果。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

Window Trigger

最後看下 Trigger 的實作原理。當我們有大量的 Key,同時每個 Key 又屬于多個視窗時,我們如何有效的觸發視窗的計算呢?

Flink 利用定時器來保證視窗的觸發,通過優先級隊列來存儲定時器。隊列頭的定時器表示離目前時間最近的一個,如果目前定時器比隊列頭的定時器時間還要早,則取消掉隊列頭的定時器,把目前的時間注冊進去。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作
flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

當這次定時器觸發之後,再從優先級隊列中取下一個 Timer,去調用 trigger 處理的函數,再把下一個 Timer 的時間注冊為定時器。這樣就可以循環疊代下去。

flink 自定義 視窗_數倉系列 | Flink 視窗的應用與實作

總結

本文主要分享了 Flink 視窗的應用與實作。首先介紹了學習一項新技術的整體思路與學習路徑,從應用入手慢慢深入它的實作。然後介紹了實時數倉的典型架構發展曆程,之後從視窗的應用場景、抽象概念、程式設計結構詳細說明了視窗的各個組成部分。并通過一些示例詳細展示了各個概念之間配合使用可以滿足什麼樣的使用場景。最後深入視窗的實作,從源碼層面說明了視窗各子產品的工作流程。

存儲 消息中間件 設計模式 druid 算法 關系型資料庫 MySQL Kafka Apache 流計算

繼續閱讀