本文轉載自「之家技術」,作者劉首維。介紹了汽車之家在基于 Flink 的實時物化視圖的一些實踐經驗與探索,并嘗試讓使用者直接以批處理 SQL 的思路開發 Flink Streaming SQL 任務。主要内容為:
- 系統分析與問題拆解
- 問題解決與系統實作
- 實時物化視圖實踐
- 限制與不足
- 總結與展望
GitHub 位址
https://github.com/apache/flink歡迎關注 Flink~
前言
物化視圖這一功能想必大家都不陌生,我們可以通過使用物化視圖,将預先設定好的複雜 SQL 邏輯,以增量疊代的形式實時 (按照事務地) 更新結果集,進而通過查詢結果集來避免每次查詢複雜的開銷,進而節省時間與計算資源。事實上,很多資料庫系統和 OLAP 引擎都不同程度地支援了物化視圖。另一方面,Streaming SQL 本身就和物化視圖有着很深的聯系,那麼基于 Apche Flink (下稱 Flink) SQL 去做一套實時物化視圖系統是一件十分自然而然的事情了。
本文介紹了汽車之家 (下稱之家) 在基于 Flink 的實時物化視圖的一些實踐經驗與探索,并嘗試讓使用者直接以批處理 SQL 的思路開發 Flink Streaming SQL 任務。希望能給大家帶來一些啟發,共同探索這一領域。
一、系統分析與問題拆解
Flink 在 Table & SQL 子產品做了大量的工作,Flink SQL 已經實作了一套成熟與相對完備的 SQL 系統,同時,我們也在 Flink SQL 上有着比較多的技術和産品積累,直接基于 Flink SQL 本身就已經解決了建構實時物化系統的大部分問題,而唯一一個需要我們解決的問題是如何不重不漏地生成資料源表對應的語義完備的 Changelog DataStream,包括增量和全量曆史兩部分。
雖然規約到隻剩一個問題,但是這個問題解決起來還是比較困難的,那我們将這個問題繼續拆解為以下幾個子問題:
1. 加載全量資料;
2. 加載增量資料;
3. 增量資料與全量資料整合。
二、問題解決與系統實作
問題一:基于資料傳輸平台的增量資料讀取
增量資料加載還是相對比較好解決的,我們直接複用實時資料傳輸平台的基礎建設。資料傳輸平台[1] 已經将 Mysql / SqlServer / TiDB 等增量資料以統一的資料格式寫入到特定的 Kafka Topic 中,我們隻要擷取到對應的 Kafka Topic 就可以進行讀取即可。
問題二:支援 checkpoint 的全量資料加載
對于全量資料載入,我們先後寫了兩個版本。
第一版我們用 Legacy Source 寫了一套
BulkLoadSourceFunction
,這一版的思路比較樸素,就是全量從資料源表進行查詢。這個版本确實能完成全量資料的加載,但是問題也是比較明顯的。如果在 bulk load 階段作業發生了重新開機,我們就不得不重新進行全量資料加載。對于資料量大的表,這個問題帶來的後果還是比較嚴重的。
對于第一版的固有問題,我們一直都沒有特别好的對策,直到 Flink-CDC[2] 2.0 的釋出。我們參考了 Flink-CDC 的全量資料加載階段支援 Checkpoint 的思路,基于 FLIP-27 開發了新的
BulkLoadSource
。第二版不論在性能上還是可用性上,對比第一版都有了大幅提升。
問題三:基于全局版本的輕量 CDC 資料整合算法
這三個子問題中,問題三的難度是遠大于前面兩個子問題的。這個問題的樸素思路或許很簡單,我們隻要按照 Key 緩存全部資料,然後根據增量資料流來觸發 Changelog DataStream 更新即可。
事實上我們也曾按照這個思路開發了一版整合邏輯的算子。這版算子對于小表還是比較 work 的,但是對于大表,這種思路固有的 overhead 開始變得不可接受。我們曾用一張資料量在 12 億,大小約 120G 的 SqlServer 表進行測試,本身就巨大的資料再加上 JVM 上不可避免的膨脹,狀态大小變得比較誇張。經過這次測試,我們一緻認為這樣粗放的政策似乎不适合作為生産版本釋出,于是我們不得不開始重新思考資料整合的算法與政策。
在談論我們的算法設計思路之前,我不得不提到 DBLog[3] 的算法設計, 這個算法的核心思路利用 watermark 對曆史資料進行辨別,并和對應的增量資料進行合并,達到不使用鎖即可完成整個增量資料和曆史資料的整合,Flink-CDC 也是基于這個思路進行的實作與改進。在相關資料搜集和分析的過程中,我們發現我們的算法思路與 DBLog 的算法的核心思路非常相似, 但是是基于我們的場景和情況進行了設計與特化。
首先分析我們的情況:
- 增量資料需要來自于資料傳輸平台的 Kafka Topic;
- 增量資料的是 at least once 的;
- 增量資料是存在全序版本号的。
結合上述情況進行分析,我們來規約一下這個算法必須要達成的目标:
- 保證資料的 Changelog Stream,資料完整,Event (RowKind) 語義完備;
- 保證該算法的 overhead 是可控的;
- 保證算法實作的處理性能是足夠高效;
- 保證算法實作不依賴任何來自于 Flink 外部的系統或者功能。
經過大家的分析與讨論後,我們設計出了一套資料整合的算法,命名為 Global Version Based Pause-free Change-Data-Capture Algorithm。
3.1 算法原理
我們同時讀入
BulkLoadSource
的全量資料與
RealtimeChangelogSource
增量資料,并根據主鍵進行 KeyBy 與 Connect,而算法的核心邏輯主要由之後的 KeyedCoProcess 階段完成。下面交待幾個關鍵的字段值:
- SearchTs:全量資料從資料源查詢出來的時間戳;
- Watermark:基于增量資料在資料庫裡産生的時間戳生成;
- Version:全序版本号,全量資料是 0,即一定最小版本。
KeyedCoProcess 收到全量資料後,不會直接發送,而是先緩存起來,等到 Watermark 的值大于該 SearchTs 後發送并清除對應 version0 版本資料的緩存。在等待的期間,如果有對應的 Changlog Data,就将被緩存的 Version0 全量資料丢棄,然後處理 Changelog Data 并發送。在整個資料處理的流程中,全量資料和增量資料都是同時進行消費與處理的,完全不需要引入暫停階段來進行資料的整合。

增量資料在全量資料發送 watermark 之前到來,隻發送增量資料即可,全量資料直接丢棄
全量資料發送 watermark 到達後,仍未有對應的增量資料,直接發送全量資料
3.2 算法實作
我們決定以 Flink Connector 的形式開展算法的實作,我們以接入 SDK 的名字 Estuary 為該 Connector 命名。通過使用
DataStreamScanProvider
,來完成 Source 内部算子間的串聯,Source 的算子組織如下圖 (chain 到一起的算子已拆開展示)。
-
/BulkLoadSource
主要負責資料的讀入和統一格式處理;ChangelogSource
-
BulkNormalize
主要是負責處理資料運作時資訊的添加與覆寫,主鍵語義處理等工作;ChangelogNormalize
-
是針對算法工作需求定制的 Watermark 生成邏輯的算子;WatermarkGenerator
- 而
就是核心的處理合并邏輯和 RowKind 語義完備性的算子。VersionBasedKeyedCoProcess
算法實作的過程中還是有很多需要優化或者進行權衡的點。全量資料進入 CoProcess 資料後,會首先檢查目前是否處理過更大版本的資料,如果沒有的話才進行處理,資料首先會被存入 State 中并根據 SearchTs + T (T 是我們設定的固有時延) 注冊 EventTimeTimer。如果沒有高版本的資料到來,定時器觸發發送 Version 0 的資料,否則直接抛棄改為發送 RowKind 語義處理好的高版本增量資料。
另一方面,避免狀态的無限增長,當系統判定 BulkLoad 階段結束後,會結束對相關 Flink State 的使用,存在的 State 隻要等待 TTL 過期即可。
另外,我們針對在資料同步且下遊 Sink 支援 Upsert 能力的場景下,開發了特别優化的超輕量模式,可以以超低的 overhead 完成全量+增量的資料同步。
開發完成後,我們的反複測試修改與驗證,完成 MVP 版本的開發。
三、實時物化視圖實踐
MVP 版本釋出後,我們與使用者同學一起,進行了基于 Flink 的物化視圖試點。
1. 基于多資料源複雜邏輯的 Data Pipeline 實時化
下面是使用者的一個真實生産需求:有三張表,分别來自于 TiDB /。SqlServer / Mysql,資料行數分别為千萬級 / 億級 / 千萬級,計算邏輯相對複雜,涉及到去重,多表 Join。原有通過離線批處理産生 T+1 的結果表。而使用者希望盡可能降低該 Pipeline 的延遲。
由于我們使用的 TiCDC Update 資料尚不包含 -U 部分,故 TiDB 表的整合算法還是采取 Legacy Mode 進行加載。
我們與使用者溝通,建議他們以批處理的思路去編寫 Flink SQL,把結果的明細資料的資料輸出到 StarRocks 中。使用者也在我們的協助下,較為快速地完成了 SQL 的開發,任務的計算拓補圖如下:
結果是相當讓人驚喜的!我們成功地在保證了資料準确性的情況下,将原來天級延遲的 Pipeline 降低至 10s 左右的延遲。資料也從原來查詢 Hive 變為查詢 StarRocks,不論從資料接入,資料預計算,還是資料計算與查詢,實作了全面的實時化。另一方面,三張表每秒的增量最大不超過 300 條,且該任務不存在更新放大的問題,是以資源使用相當的少。根據監控回報的資訊,初始化階段完成後,整個任務 TM 部分隻需要使用 1 個 Cpu (on YARN),且 Cpu 使用常态不超過 20%。對比原來批處理的資源使用,無疑也是巨大提升。
2. 資料湖場景優化
正如上文提到的,對于資料同步,我們做了專門的優化。隻需要使用專用的 Source 表,就可以一鍵開啟曆史資料 + 增量資料資料同步,大大簡化了資料同步的流程。我們目前嘗試使用該功能将資料同步至基于 Iceberg 的資料湖中,從資料同步層面大幅提升資料新鮮度。
四、限制與不足
雖然我們在這個方向的探索取得了一定成果,但是仍有一定的限制和不足。
1. 伺服器時鐘的隐式依賴
仔細閱讀上面算法原理,我們會發現,不論是 SearchTs 的生成還是 Watermark 的生成,實際上最後都依賴了伺服器系統的時鐘,而非依賴類似 Time Oracle 機制。我們雖然算法實作上引入固有延遲去規避這個問題,但是如果伺服器出現非常嚴重時鐘不一緻,超過固有延遲的話,此時 watermark 是不可靠的,有可能會造成處理邏輯的錯誤。
經确認,之家伺服器時鐘會進行校準操作。
2. 一緻性與事務
事實上我們目前這套實作沒有任何事務相關的保證機制,僅能承諾結果的最終一緻性,最終一緻性其實是一種相當弱的保證。就拿上文提到的例子來說,如果其中一張表存在 2 個小時的消費延遲,另一張表基本不存在延遲,這個時候兩表 Join 産生的結果其實是一種中間狀态,或者說對于外部系統應該是不可見的。
為了完成更高的一緻性保證,避免上面問題的産生,我們自然會想到引入事務送出機制。然而目前我們暫時沒有找到比較好的實作思路,但是可以探讨下我們目前的思考。
2.1 如何定義事務
事務這個概念想必大家或多或少都有認識,在此不多贅述。如何資料庫系統内部定義事務是一件特别自然且必要的事情,但是如何在這種跨資料源場景下定義事務,其實是一件非常困難的事情。還是以上文的例子來展開,我們能看到資料源來自各種不同資料庫,我們其實對于單表記錄了對應的事務資訊,但是确實沒有辦法定義來自不同資料源的統一事務。我們目前的樸素思路是根據資料産生的時間為基準,結合 checkpoint 統一劃定 Epoch,實作類似 Epoch-based Commit 的送出機制。但是這樣做又回到前面提到的問題,需要對伺服器時間産生依賴,無法從根源保證正确性。
2.2 跨表事務
對于 Flink 物化視圖一緻性送出這個問題,TiFlink[4] 已經做了很多相關工作。但是我們的 Source 來自不同資料源,且讀取自 Kafka,是以問題變得更為複雜,還是上面提到的例子,兩張表 Join 過後,如果想保證一緻性,不隻是 Source 和 Sink 算子,整個關系代數算子體系都需要考慮引入事務送出的概念和機制,進而避免中間狀态的對外部系統的釋出。
3. 更新放大
這個問題其實比較好了解。現在有兩張表 join,對于左表的每一行資料,對應右表都有 n (n > 100) 條資料與之對應。那麼現在更新左表的任意一行,都會有 2n 的更新放大。
4. 狀态大小
目前整套算法在全量同步階段的 Overhead 雖然可控,但是仍有優化空間。我們目前實測,對于一張資料量在 1 億左右的表,在全量資料階段,需要峰值最大為 1.5G 左右的 State。我們打算在下個版本繼續優化狀态大小,最直接的思路就是
BulkSource
通知
KeyedCoProcess
哪些主鍵集合是已經處理完畢的,這樣可以使對應的 Key 提早進入全量階段完成模式,進而進一步優化狀态大小。
五、總結與展望
本文分析了基于 Flink 物化視圖實作的問題與挑戰,着重介紹了處理生成完整的 Changelog DataStream 的算法與實作和在業務上的收益,也充分闡述了目前的限制與不足。
雖然這次實踐的結果稱不上完備,存在一些問題亟待解決,但是我們仍看到了巨大的突破與進步,不論是從技術還是業務使用上。我們充分相信未來這項技術會越來越成熟,越來越被更多人認可和使用,也通過此次探索充分驗證了流處理和批處理的統一性。
我們目前的實作還處在早期版本,仍有着工程優化和 bug fix 的空間與工作 (比如前文提到的兩表的推進的 skew 太大問題,可以嘗試引入 Coordinator 進行調節與對齊),但是相信随着不斷的疊代與發展,這項工作會變得越來越穩固,進而支撐更多業務場景,充分提升資料處理的品質與效率!
特别鳴謝張茄子和雲邪老師的幫助與勘誤。
引用
[1]
http://mp.weixin.qq.com/s/KQH-relbrZ2GUqdmaTWx6Q[2]
http://github.com/ververica/flink-cdc-connectors[3]
http://arxiv.org/pdf/2010.12597.pdf[4]
http://zhuanlan.zhihu.com/p/422931694近期熱點
更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
第一時間擷取最新技術文章和社群動态,請關注公衆号~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制衛衣;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc