整理 | Aven (Flink 社群志願者)
摘要:本文由 Apache Flink Committer,位元組跳動架構研發工程師李本超分享,以四個章節來介紹 Flink 在位元組的應用實戰。 内容如下:
- 整體介紹
- 實踐優化
- 流批一體
- 未來規劃
一、整體介紹

2018 年 12 月 Blink 宣布開源,經曆了約一年的時間 Flink 1.9 于 2019 年 8 月 22 釋出。在 Flink 1.9 釋出之前位元組跳動内部基于 master 分支進行内部的 SQL 平台建構。經曆了 2~3 個月的時間位元組内部在 19 年 10 月份釋出了基于 Flink 1.9 的 Blink planner 建構的 Streaming SQL 平台,并進行内部推廣。在這個過程中發現了一些比較有意思的需求場景,以及一些較為奇怪的 BUG。
基于 1.9 的 Flink SQL 擴充
雖然最新的 Flink 版本已經支援 SQL 的 DDL,但 Flink 1.9 并不支援。位元組内部基于 Flink 1.9 進行了 DDL 的擴充支援以下文法:
- create table
- create view
- create function
- add resource
同時 Flink 1.9 版本不支援的 watermark 定義在 DDL 擴充後也支援了。
我們在推薦大家盡量的去用 SQL 表達作業時收到很多“SQL 無法表達複雜的業務邏輯”的回報。時間久了發現其實很多使用者所謂的複雜業務邏輯有的是做一些外部的 RPC 調用,位元組内部針對這個場景做了一個 RPC 的維表和 sink,讓使用者可以去讀寫 RPC 服務,極大的擴充了 SQL 的使用場景,包括 FaaS 其實跟 RPC 也是類似的。在位元組内部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等維表的支援。
同時還實作了多個内部使用的 connectors:
- source: RocketMQ
-
sink:
RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics
并且為 connector 開發了配套的 format:PB/Binlog/Bytes。
線上的界面化 SQL 平台
除了對 Flink 本身功能的擴充,位元組内部也上線了一個 SQL 平台,支援以下功能:
- SQL 編輯
- SQL 解析
- SQL 調試
- 自定義 UDF 和 Connector
- 版本控制
- 任務管理
二、實踐優化
除了對功能的擴充,針對 Flink 1.9 SQL 的不足之處也做了一些優化。
Window 性能優化
1、支援了 window Mini-Batch
Mini-Batch 是 Blink planner 的一個比較有特色的功能,其主要思想是積攢一批資料,再進行一次狀态通路,達到減少通路狀态的次數降低序列化反序列化的開銷。這個優化主要是在 RocksDB 的場景。如果是 Heap 狀态 Mini-Batch 并沒什麼優化。在一些典型的業務場景中,得到的回報是能減少 20~30% 左右的 CPU 開銷。
2、擴充 window 類型
目前 SQL 中的三種内置 window,滾動視窗、滑動視窗、session 視窗,這三種語意的視窗無法滿足一些使用者場景的需求。比如在直播的場景,分析師想統計一個主播在開播之後,每一個小時的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等名額。自然的滾動視窗的劃分方式并不能夠滿足使用者的需求,位元組内部就做了一些定制的視窗來滿足使用者的一些共性需求。
-- my_window 為自定義的視窗,滿足特定的劃分方式
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
my_window(ts, INTERVAL '1' HOURS)
3、window offset
這是一個較為通用的功能,在 Datastream API 層是支援的,但 SQL 中并沒有。這裡有個比較有意思的場景,使用者想要開一周的視窗,一周的視窗變成了從周四開始的非自然周。因為誰也不會想到 1970 年 1 月 1 号那天居然是周四。在加入了 offset 的支援後就可以支援正确的自然周視窗。
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)
維表優化
1、延遲 Join
維表 Join 的場景下因為維表經常發生變化尤其是新增次元,而 Join 操作發生在次元新增之前,經常導緻關聯不上。
是以使用者希望如果 Join 不到,則暫時将資料緩存起來之後再進行嘗試,并且可以控制嘗試次數,能夠自定義延遲 Join 的規則。這個需求場景不單單在位元組内部,社群的很多同學也有類似的需求。
基于上面的場景實作了延遲 Join 功能,添加了一個可以支援延遲 Join 維表的算子。當 Join 沒有命中,local cache 不會緩存空的結果,同時将資料暫時儲存在一個狀态中,之後根據設定定時器以及它的重試次數進行重試。
2、維表 Keyby 功能
通過拓撲我們發現 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因為它沒有一個 key 的語義。
當作業并行度比較大,每一個維表 Join 的 subtask,通路的是所有的緩存空間,這樣對緩存來說有很大的壓力。
但觀察 Join 的 SQL,等值 Join 是天然具有 Hash 屬性的。直接開放了配置,運作使用者直接把維表 Join 的 key 作為 Hash 的條件,将資料進行分區。這樣就能保證下遊每一個算子的 subtask 之間的通路空間是獨立的,這樣可以大大的提升開始的緩存命中率。
除了以上的優化,還有兩點目前正在開發的維表優化。
1、廣播維表:有些場景下維表比較小,而且更新不頻繁,但作業的 QPS 特别高。如果依然通路外部系統進行 Join,那麼壓力會非常大。并且當作業 Failover 的時候 local cache 會全部失效,進而又對外部系統造成很大通路壓力。那麼改進的方案是定期全量 scan 維表,通過Join key hash 的方式發送到下遊,更新每個維表 subtask 的緩存。
2、Mini-Batch:主要針對一些 I/O 請求比較高,系統又支援 batch 請求的能力,比如說 RPC、HBase、Redis 等。以往的方式都是逐條的請求,且 Async I/O 隻能解決 I/O 延遲的問題,并不能解決通路量的問題。通過實作 Mini-Batch 版本的維表算子,大量降低維表關聯通路外部存儲次數。
Join 優化
目前 Flink 支援的三種 Join 方式;分别是 Interval Join、Regular Join、Temporal Table Function。
前兩種語義是一樣的流和流 Join。而 Temporal Table 是流和表的的 Join,右邊的流會以主鍵的形式形成一張表,左邊的流去 Join 這張表,這樣一次 Join 隻能有一條資料參與并且隻傳回一個結果。而不是有多少條都能 Join 到。
它們之間的差別列了幾點:
可以看到三種 Join 方式都有它本身的一些缺陷。
- Interval Join 目前使用上的缺陷是它會産生一個 out join 資料和 watermark 亂序的情況。
- Regular Join 的話,它最大的缺陷是 retract 放大(之後會詳細說明這個問題)。
- Temporal table function 的問題較其它多一些,有三個問題。
- 不支援 DDl
- 不支援 out join 的語義 (FLINK-7865 的限制)
- 右側資料斷流導緻 watermark 不更新,下遊無法正确計算 (FLINK-18934)
對于以上的不足之處位元組内部都做了對應的修改。
增強 Checkpoint 恢複能力
對于 SQL 作業來說一旦發生條件變化都很難從 checkpoint 中恢複。
SQL 作業确實從 checkpoint 恢複的能力比較弱,因為有時候做一些看起來不太影響 checkpoint 的修改,它仍然無法恢複。無法恢複主要有兩點;
- 第一點:operate ID 是自動生成的,然後因為某些原因導緻它生成的 ID 改變了。
- 第二點:算子的計算的邏輯發生了改變,即算子内部的狀态的定義發生了變化。
例子1:并行度發生修改導緻無法恢複。
source 是一個最常見的有狀态的算子,source 如果和之後的算子的 operator chain 邏輯發生了改變,是完全無法恢複的。
下圖左上是正常的社群版的作業會産生的一個邏輯, source 和後面的并行度一樣的算子會被 chain 在一起,使用者是無法去改變的。但算子并行度是常會會發生修改,比如說 source 由原來的 100 修改為 50,cacl 的并發是 100。此時 chain 的邏輯就會發生變化。
針對這種情況,位元組内部做了修改,允許使用者去配置,即使 source 的并行度跟後面整體的作業的并行度是一樣的,也讓其不與之後的算子 chain 在一起。
例子2:DAG 改變導緻無法恢複。
這是一種比較特殊的情況,有一條 SQL (上圖),可以看到 source 沒有發生變化,之後的三個聚合互相之間沒有關系,狀态竟然也是無法恢複。
作業之是以無法恢複,是因為 operator ID 生成規則導緻的。目前 SQL 中 operator ID 的生成的規則與上遊、本身配置以及下遊可以 chain 在一起的算子的數量都有關系。 因為新增名額,會導緻新增一個 Calc 的下遊節點,進而導緻 operator ID 發生變化。
為了處理這種情況,支援了一種特殊的配置模式,允許使用者配置生成 operator ID 的時候可以忽略下遊 chain 在一起算子數量的條件。
例子3:新增聚合名額導緻無法恢複
這塊是使用者訴求最大的,也是最複雜的部分。使用者期望新增一些聚合名額後,原來的名額要能從 checkpoint 中恢複。
可以看到圖中左部分是 SQL 生成的算子邏輯。count,sum,sum,count,distinct 會以一個 BaseRow 的結構存儲在 ValueState 中。distinct 比較特殊一些,還會單獨存儲在一個 MapState 中。
這導緻了如新增或者減少名額,都會使原先的狀态沒辦法從 ValueState 中正常恢複,因為 VauleState 中存儲的狀态 “schema” 和新的(修改名額後)的 “schema”不比對,無法正常反序列化。
在讨論解決方案之前,我們先回顧一下正常的恢複流。先從 checkpoint 中恢複出狀态的 serializer,再通過 serializer 把狀态恢複。接下來 operator 去注冊新的狀态定義,新的狀态定義會和原先的狀态定義進行一個相容性對比,如果是相容則狀态恢複成功,如果不相容則抛出異常任務失敗。
不相容的另一種處理情況是允許傳回一個 migration(實作兩個不比對類型的狀态恢複)那麼也可以恢複成功。
針對上面的流程做出對應的修改:
- 第一步使新舊 serializer 互相知道對方的資訊,添加一個接口,且修改了 statebackend resolve compatibility 的過程,把舊的資訊傳遞給新的,并使其擷取整個 migrate 過程。
- 第二步判斷新老之間是否相容,如果不相容是否需要做一次 migration。然後讓舊的 serializer 去恢複一遍狀态,并使用新的 serializer 寫入新的狀态。
- 對 aggregation 的代碼生成進行處理,當發現 aggregation 拿到的是名額是 null,那麼将做一些初始化的工作。
通過以上的修改基本就可以做到正常的,新增的聚合名額從拆開的方案恢複。
三、流批一體探索
業務現狀
位元組跳動内部對流批一體和業務推廣之前,技術團隊提前做了大量技術方面的探索。整體判斷是 SQL 這一層是可以做到流批一體的語義,但實踐中卻又發現不少不同。
比如說流計算的 session window,或是基于處理時間的 window,在批計算中無法做到。同時 SQL 在批計算中一些複雜的 over window,在流計算中也沒有對應的實作。
但這些特别的場景可能隻占 10% 甚至更少,是以用 SQL 去落實流批一體是可行的。
這張圖是比較常見的和大多數公司裡的架構都類似。這種架構有什麼缺陷呢?
- 資料不同源:批任務一般會有一次前置處理任務,不管是離線的也好實時的也好,預先進過一層加工後寫入 Hive。而實時任務是從 kafka 讀取原始的資料,可能是 json 格式,也可能是 avro 等等。直接導緻批任務中可執行的 SQL 在流任務中沒有結果生成或者執行結果不對。
- 計算不同源:批任務一般是 Hive + Spark 的架構,而流任務基本都是基于 Flink。不同的執行引擎在實作上都會有一些差異,導緻結果不一緻。不同的執行引擎有不同的 API 定義 UDF,它們之間也是無法被公用的。大部分情況下都是維護兩套基于不同 API 實作的相同功能的 UDF。
鑒于上面的問題,提出了基于 Flink 的流批一體架構來解決。
- 資料不同源:流式處理先通過 Flink 處理之後寫入 MQ 供下遊流式 Flink job 去消費,對于批式處理由 Flink 處理後流式寫入到 Hive,再由批式的 Flink job 去處理。
- 引擎不同源:既然都是基于 Flink 開發的流式,批式 job,自然沒有計算不同源問題,同時也避免了維護多套相同功能的 UDF。
基于 Flink 實作的流批一體架構:
業務收益
- 統一的 SQL:通過一套 SQL 來表達流和批計算兩種場景,減少開發維護工作。
- 複用 UDF:流式和批式計算可以共用一套 UDF。這對業務來說是有積極意義的。
- 引擎統一:對于業務的學習成本和架構的維護成本都會降低很多。
- 優化統一:大部分的優化都是可以同時作用在流式和批式計算上,比如對 planner、operator 的優化流和批可以共享。
四、未來工作和規劃
優化 retract 放大問題
什麼是 retract 放大?
上圖有 4 張表,第一張表進行去重操作 (Dedup),之後分别和另外三張表做 Join。邏輯比較簡單,表 A 輸入(A1),最後産出 (A1,B1,C1,D1) 的結果。
當表 A 輸入一個 A2,因為 Dedup 算子,導緻資料需要去重,則向下遊發送一個撤回 A1 的操作 -(A1) 和一個新增 A2 的操作 +(A2)。第一個 Join 算子收到 -(A1) 後會将 -(A1) 變成 -(A1,B1) 和 +(null,B1)(為了保持它認為的正确語義) 發送到下遊。之後又收到了 +(A2) ,則又向下遊發送 -(null,B1) 和 +(A2,B1) 這樣操作就放大了兩倍。再經由下遊的算子操作會一直被放大,到最終的 sink 輸出可能會被放大 1000 倍之多。
如何解決?
将原先 retract 的兩條資料變成一條 changelog 的格式資料,在算子之間傳遞。算子接收到 changelog 後處理變更,然後僅僅向下遊發送一個變更 changelog 即可。
1.功能優化
- 支援所有類型聚合名額變更的 checkpoint 恢複能力
- window local-global
- 事件時間的 Fast Emit
- 廣播維表
- 更多算子的 Mini-Batch 支援:維表,TopN,Join 等
- 全面相容 Hive SQL 文法
2.業務擴充
- 進一步推動流式 SQL 達到 80%
- 探索落地流批一體産品形态
- 推動實時數倉标準化