天天看點

基于 Flink+Iceberg 建構企業級實時資料湖

Apache Flink 是大資料領域非常流行的流批統一的計算引擎,資料湖是順應雲時代發展潮流的新型技術架構。那麼當 Apache Flink 遇見資料湖時,會碰撞出什麼樣的火花呢?本次分享主要包括以下核心内容:
  1. 資料湖的相關背景介紹;
  2. 經典業務場景介紹;
  3. 為什麼選擇 Apache Iceberg;
  4. 如何通過 Flink+Iceberg 實作流式入湖
  5. 社群未來規劃工作。
視訊回顧: https://www.bilibili.com/video/BV14A411J7e6?p=4

資料湖的相關背景介紹

資料湖是個什麼概念呢?一般來說我們把一家企業産生的資料都維護在一個平台内,這個平台我們就稱之為“資料湖”。

看下面這幅圖,這個湖的資料來源多種多樣,有的可能是結構化資料,有的可能是非結構資料,有的甚至是二進制資料。有一波人站在湖的入口,用裝置在檢測水質,這對應着資料湖上的流處理作業;有一批抽水機從湖裡面抽水,這對應着資料湖的批處理作業;還有一批人在船頭釣魚或者在岸上捕魚,這對應着資料科學家從資料湖中通過機器學習的手段來提取資料價值。

基于 Flink+Iceberg 建構企業級實時資料湖
  1. 我們總結起來,其實資料湖主要有 4 個方面的特點。
  2. 第一個特點是存儲原始資料,這些原始資料來源非常豐富;
  3. 第二個特點是支援多種計算模型;
  4. 第三個特點是有完善的資料管理能力,要能做到多種資料源接入,實作不同資料之間的連接配接,支援 schema 管理和權限管理等;
  5. 第四個特點是靈活的底層存儲,一般用 ds3、oss、hdfs 這種廉價的分布式檔案系統,采用特定的檔案格式和緩存,滿足對應場景的資料分析需求。
基于 Flink+Iceberg 建構企業級實時資料湖

那麼開源資料湖架構一般是啥樣的呢?這裡我畫了一個架構圖,主要分為四層:

  1. 最底下是分布式檔案系統,雲上使用者 S3 和 oss 這種對象存儲會用的更多一些,畢竟價格便宜很多;非雲上使用者一般采用自己維護的 HDFS。
  2. 第二層是資料加速層。資料湖架構是一個存儲計算徹底分離的架構,如果所有的資料通路都遠端讀取檔案系統上的資料,那麼性能和成本開銷都很大。如果能把經常通路到的一些熱點資料緩存在計算節點本地,這就非常自然的實作了冷熱分離,一方面能收獲到不錯的本地讀取性能,另一方面還節省了遠端通路的帶寬。這一層裡面,我們一般會選擇開源的 alluxio,或者選擇阿裡雲上的 Jindofs。
  3. 第三層就是 Table format 層,主要是把一批資料檔案封裝成一個有業務意義的 table,提供 ACID、snapshot、schema、partition 等表級别的語義。一般對應這開源的 Delta、Iceberg、Hudi 等項目。對一些使用者來說,他們認為Delta、Iceberg、Hudi 這些就是資料湖,其實這幾個項目隻是資料湖這個架構裡面的一環,隻是因為它們離使用者最近,屏蔽了底層的很多細節,是以才會造成這樣的了解。
  4. 最上層就是不同計算場景的計算引擎了。開源的一般有 Spark、Flink、Hive、Presto、Hive MR 等,這一批計算引擎是可以同時通路同一張資料湖的表的。
基于 Flink+Iceberg 建構企業級實時資料湖

經典業務場景介紹

那麼,Flink 和資料湖結合可以有哪些經典的應用場景呢?這裡我們探讨業務場景時預設選型了 Apache Iceberg 來作為我們的資料湖選型,後面一節會詳細闡述選型背後的理由。

基于 Flink+Iceberg 建構企業級實時資料湖

首先,Flink+Iceberg 最經典的一個場景就是建構實時的 Data Pipeline。業務端産生的大量日志資料,被導入到 Kafka 這樣的消息隊列。運用 Flink 流計算引擎執行 ETL後,導入到 Apache Iceberg 原始表中。有一些業務場景需要直接跑分析作業來分析原始表的資料,而另外一些業務需要對資料做進一步的提純。那麼我們可以再新起一個 Flink 作業從 Apache Iceberg 表中消費增量資料,經過處理之後寫入到提純之後的 Iceberg 表中。此時,可能還有業務需要對資料做進一步的聚合,那麼我們繼續在iceberg 表上啟動增量 Flink 作業,将聚合之後的資料結果寫入到聚合表中。

有人會想,這個場景好像通過 Flink+Hive 也能實作。 Flink+Hive 的确可以實作,但寫入到 Hive 的資料更多地是為了實作數倉的資料分析,而不是為了做增量拉取。一般來說,Hive 的增量寫入以 partition 為機關,時間是 15min 以上,Flink 長期高頻率地寫入會造成 partition 膨脹。而 Iceberg 容許實作 1 分鐘甚至 30秒的增量寫入,這樣就可以大大提高了端到端資料的實時性,上層的分析作業可以看到更新的資料,下遊的增量作業可以讀取到更新的資料。

基于 Flink+Iceberg 建構企業級實時資料湖

第二個經典的場景,就是可以用 Flink+Iceberg 來分析來自 MySQL 等關系型資料庫的 binlog 等。一方面,Apache Flink 已經原生地支援 CDC 資料解析,一條 binlog 資料通過 ververica flink-cdc-connector 拉取之後,自動轉換成 Flink Runtime 能識别的 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 四種消息,供使用者做進一步的實時計算。

另外一方面,Apache Iceberg 已經較為完善地實作了 equality delete 功能,也就是使用者定義好待删除的 Record,直接寫到 Apache Iceberg 表内就可以删除對應的行,本身就是為了實作資料湖的流式删除。在 Iceberg 未來的版本中,使用者将不需要設計任何額外的業務字段,不用寫幾行代碼就可以完成 binlog 流式入湖到 Apache Iceberg(社群的這個 Pull Request 已經提供了一個 flink 寫入 CDC 資料的原型)。

此外,CDC 資料成功入湖 Iceberg 之後,我們還會打通常見的計算引擎,例如 Presto、Spark、Hive 等,他們都可以實時地讀取到 Iceberg 表中的最新資料。

基于 Flink+Iceberg 建構企業級實時資料湖

第三個經典場景是近實時場景的流批統一。在常用的 lambda 架構中,我們有一條實時鍊路和一條離線鍊路。實時鍊路一般由 Flink、Kafka、HBase 這些元件建構而成,而離線鍊路一般會用到 Parquet、Spark 等元件建構。這裡面涉及到計算元件和存儲元件都非常多,系統維護成本和業務開發成本都非常高。有很多場景,他們的實時性要求并沒有那麼苛刻,例如可以放松到分鐘級别,這種場景我們稱之為近實時場景。那麼,我們是不是可以通過 Flink + Iceberg 來優化我們常用的 lambda 架構呢?

基于 Flink+Iceberg 建構企業級實時資料湖

我們可以用 Flink+Iceberg 把整個架構優化成上圖所示。實時的資料通過 Flink 寫入到 Iceberg 表中,近實時鍊路依然可以通過flink計算增量資料,離線鍊路也可以通過 flink 批計算讀取某個快照做全局分析,得到對應的分析結果,供不同場景下的使用者讀取和分析。經過這種改進之後,我們把計算引擎統一成了 Flink,把存儲元件統一成了 Iceberg,整個系統的維護開發成本大大降低。

基于 Flink+Iceberg 建構企業級實時資料湖

第四個場景,是采用 Iceberg 全量資料和 Kafka 的增量資料來 Bootstrap 新的 Flink 作業。我們現有的流作業線上上跑着,突然有一天某個業務方跑過來說,他們遇到一個新的計算場景,需要設計一個新的 Flink 作業,跑一遍去年一年的曆史資料,跑完之後再對接到正在産生的 Kafka 增量資料。那麼這時候應該怎麼辦呢?

我們依然可以采用常見的 lambda 架構,離線鍊路通過 kafka->flink->iceberg 同步寫入到資料湖,由于 Kafka 成本較高,保留最近 7 天資料即可,Iceberg 存儲成本較低,可以存儲全量的曆史資料(按照 checkpoint 拆分成多個資料區間)。啟動新 Flink 作業的時候,隻需要去拉 Iceberg 的資料,跑完之後平滑地對接到 kafka 資料即可。

基于 Flink+Iceberg 建構企業級實時資料湖

第五個場景和第四個場景有點類似。同樣是在 lambda 架構下,實時鍊路由于事件丢失或者到達順序的問題,可能導緻流計算端結果不一定完全準确,這時候一般都需要全量的曆史資料來訂正實時計算的結果。而我們的 Iceberg 可以很好地充當這個角色,因為它可以高成本效益地管理好曆史資料。

為什麼選擇 Apache Iceberg

回到上一節遺留的一個問題,為什麼當時 Flink 在衆多開源資料湖項目中會選擇 Apache Iceberg 呢?

基于 Flink+Iceberg 建構企業級實時資料湖

我們當時詳細地調研了 Delta、Hudi、Iceberg 三個開源項目,并寫了一篇調研報告。我們發現 Delta 和 Hudi 跟 Spark 的代碼路徑綁定太深,尤其是寫入路徑。畢竟當時這兩個項目設計之初,都多多少少把 Spark 作為的他們預設的計算引擎了。而Apache Iceberg 的方向非常堅定,宗旨就是要做一個通用化設計的 Table Format。是以,它完美地解耦了計算引擎和底下的存儲系統,便于接入多樣化計算引擎和檔案格式,可以說正确地完成了資料湖架構中的 Table Format 這一層的實作。我們認為它也更容易成為 Table Format 層的開源事實标準。

另外一方面,Apache Iceberg 正在朝着流批一體的資料湖存儲層發展,manifest 和snapshot 的設計,有效地隔離不同 transaction 的變更,非常友善批處理和增量計算。而我們知道 Apache Flink 已經是一個流批一體的計算引擎,可以說這二者的長遠規劃完美比對,未來二者将合力打造流批一體的資料湖架構。

最後,我們還發現 Apache Iceberg 這個項目背後的社群資源非常豐富。在國外, Netflix、Apple、Linkedin、Adobe 等公司都有 PB 級别的生産資料運作在 Apache Iceberg 上;在國内,騰訊這樣的巨頭也有非常龐大的資料跑在 Apache Iceberg 之上,他們最大的一個業務每天有幾十T的增量資料寫入到 Apache Iceberg。社群成員同樣非常資深和多樣化,擁有來自其他項目的 7 位 Apache PMC,1 為 VP。展現在代碼和設計的 review 上,就變得非常苛刻,一個稍微大一點的 PR 涉及 100+ 的comment 很常見。在我個人看來,這些都使得 Apache Iceberg 的設計+代碼品質比較高。

正式基于以上考慮,Apache Flink 最終選擇了 Apache Iceberg 作為第一個資料湖接入項目。

目前,我們已經在 Apache Iceberg 0.10.0 版本上實作 Flink 流批入湖功能,同時還支援 Flink 批作業查詢 Iceberg 資料湖的資料。具體關于 Flink 如何讀寫 Apache Iceberg 表,可以參考 Apache Iceberg 社群的使用文檔,這裡不再贅述。

https://github.com/apache/iceberg/blob/master/site/docs/flink.md

下面來簡要闡述下 Flink iceberg sink 的設計原理:由于 Iceberg 采用樂觀鎖的方式來實作 Transaction 的送出,也就是說兩個人同時送出更改事務到 Iceberg 時,後開始的一方會不斷重試,等先開始的一方順利送出之後再重新讀取 metadata 資訊送出 transaction。考慮到這一點,采用多個并發算子去送出 transaction 是不合适的,容易造成大量事務沖突,導緻重試。

是以,我們把 Flink 寫入流程拆成了兩個算子,一個叫做 IcebergStreamWriter,主要用來寫入記錄到對應的 avro、parquet、orc 檔案,生成一個對應的 Iceberg DataFile,并發送給下遊算子;另外一個叫做 IcebergFilesCommitter,主要用來在 checkpoint 到來時把所有的 DataFile 檔案收集起來,并送出 Transaction 到 Apache iceberg,完成本次 checkpoint 的資料寫入。

基于 Flink+Iceberg 建構企業級實時資料湖

了解了 Flink Sink 算子的設計後,下一個比較重要的問題就是:如何正确地設計兩個算子的 state ?

首先,IcebergStreamWriter 的設計比較簡單,主要任務是把記錄轉換成 DataFile,并沒有複雜的 State 需要設計。IcebergFilesCommitter 相對複雜一點,它為每個checkpointId 維護了一個 DataFile 檔案清單,即 map>,這樣即使中間有某個 checkpoint的transaction 送出失敗了,它的 DataFile 檔案仍然維護在 State 中,依然可以通過後續的 checkpoint 來送出資料到 Iceberg 表中。

社群未來規劃工作等

Apache Iceberg 0.10.0 版本的釋出,已經拉開內建 Flink 和 Iceberg 的序幕。在未來的 Apache Iceberg 0.11.0 和 0.12.0 版本中,我們規劃了更多進階功能及特性。

對于 Apache 0.11.0 版本來說,主要解決兩個問題:

第一個事情是小檔案合并的問題,當然 Apache Iceberg 0.10.0 版本已經支援了Flink 批作業定時去合并小檔案,這個功能還相對較為初級。在 0.11.0 版本中,我們将設計自動合并小檔案功能,簡單來說就是在 Flink checkpoint 到達,觸發 Apache Iceberg transaction 送出後,有一個專門的算子,專門負責處理小檔案的合并工作。

第二個事情是 Flink streaming reader 的開發,目前我們已經在私有倉庫做了一些 PoC 工作,在未來的時間内我們将貢獻到 Apache Iceberg 社群。

對于 0.12.0 版本來說,主要解決 row-level delete 的問題。如前面提到,我們已經在 PR 1663 中實作 Flink UPSERT 更新資料湖的全鍊路打通。後續在社群達成一緻之後,将逐漸推動該功能到社群版本。到時候使用者将能通過 Flink 完成 CDC 資料的實時寫入和分析,也可以友善地把 Flink 的聚合結果 upsert 到 Apache Iceberg 内。

作者介紹:

胡争(子毅),阿裡巴巴技術專家,目前主要負責 Flink 資料湖方案的設計和開發工作,Apache Iceberg 及 Apache Flink 項目的長期活躍貢獻者,《HBase 原理與實踐》作者。

基于 Flink+Iceberg 建構企業級實時資料湖

繼續閱讀