天天看點

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

内容簡要:

一、資料倉庫架構更新的背景

二、基于 Iceberg 的湖倉一體架構實踐

三、總結與收益

四、後續規劃

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

1. 基于 Hive 的資料倉庫的痛點

原有的資料倉庫完全基于 Hive 建造而成,主要存在三大痛點:

痛點一:不支援 ACID

1)不支援 Upsert 場景;

2)不支援 Row-level delete,資料修正成本高。

痛點二:時效性難以提升

1)資料難以做到準實時可見;

2)無法增量讀取,無法實作存儲層面的流批統一;

3)無法支援分鐘級延遲的資料分析場景。

痛點三:Table Evolution

1)寫入型 Schema,對 Schema 變更支援不好;

2)Partition Spec 變更支援不友好。

2. Iceberg 關鍵特性

Iceberg 主要有四大關鍵特性:支援 ACID 語義、增量快照機制、開放的表格式和流批接口支援。

  • 支援 ACID 語義
    • 不會讀到不完整的 Commit;
    • 基于樂觀鎖支援并發 Commit;
    • Row-level delete,支援 Upsert。
  • 增量快照機制
    • Commit 後資料即可見(分鐘級);
    • 可回溯曆史快照。
  • 開放的表格式
    • 資料格式:parquet、orc、avro
    • 計算引擎:Spark、Flink、Hive、Trino/Presto
  • 流批接口支援
    • 支援流、批寫入;
    • 支援流、批讀取。

湖倉一體的意義就是說我不需要看見湖和倉,資料有着打通的中繼資料的格式,它可以自由的流動,也可以對接上層多樣化的計算生态。

——賈揚清(阿裡雲計算平台進階研究員)

1. Append 流入湖的鍊路

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

上圖為日志類資料入湖的鍊路,日志類資料包含用戶端日志、使用者端日志以及服務端日志。這些日志資料會實時錄入到 Kafka,然後通過 Flink 任務寫到 Iceberg 裡面,最終存儲到 HDFS。

2. Flink SQL 入湖鍊路打通

我們的 Flink SQL 入湖鍊路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對接 Iceberg Catalog 我們主要做了以下内容:

1)Meta Server 增加對 Iceberg Catalog 的支援;

2)SQL SDK 增加 Iceberg Catalog 支援。

然後在這基礎上,平台開放 Iceberg 表的管理功能,使得使用者可以自己在平台上建 SQL 的表。

3. 入湖 - 支援代理使用者

第二步是内部的實踐,對接現有預算體系、權限體系。

因為之前平台做實時作業的時候,平台都是預設為 Flink 使用者去運作的,之前存儲不涉及 HDFS 存儲,是以可能沒有什麼問題,也就沒有思考預算劃分方面的問題。

但是現在寫 Iceberg 的話,可能就會涉及一些問題。比如數倉團隊有自己的集市,資料就應該寫到他們的目錄下面,預算也是劃到他們的預算下,同時權限和離線團隊賬号的體系打通。

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

如上所示,這塊主要是在平台上做了代理使用者的功能,使用者可以去指定用哪個賬号去把這個資料寫到 Iceberg 裡面,實作過程主要有以下三個。

  • 增加 Table 級别配置:'iceberg.user.proxy' = 'targetUser’

    1)啟用 Superuser

    2)團隊賬号鑒權

    汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐
  • 通路 HDFS 時啟用代理使用者:
    汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐
  • 通路 Hive Metastore 時指定代理使用者

    1)參考 Spark 的相關實作:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2)動态代理 HiveMetaStoreClient,使用代理使用者通路 Hive metastore

4. Flink SQL 入湖示例

DDL + DML

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

5. CDC 資料入湖鍊路

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

如上所示,我們有一個 AutoDTS 平台,負責業務庫資料的實時接入。我們會把這些業務庫的資料接入到 Kafka 裡面,同時它還支援在平台上配置分發任務,相當于把進 Kafka 的資料分發到不同的存儲引擎裡,在這個場景下是分發到 Iceberg 裡。

6. Flink SQL CDC 入湖鍊路打通

下面是我們基于 “Flink1.11 + Iceberg 0.11” 支援 CDC 入湖所做的改動:

  • 改進 Iceberg Sink:

    Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并适配。

  • 表管理

    1)支援 Primary key(PR1978)

    2)開啟 V2 版本:'iceberg.format.version' = '2'

7. CDC 資料入湖

1. 支援 Bucket

Upsert 場景下,需要確定同一條資料寫入到同一 Bucket 下,這又如何實作?

目前 Flink SQL 文法不支援聲明 bucket 分區,通過配置的方式聲明 Bucket:

'partition.bucket.source'='id', // 指定 bucket 字段

'partition.bucket.num'='10', // 指定 bucket 數量

2. Copy-on-write sink

做 Copy-on-Write 的原因是原本社群的 Merge-on-Read 不支援合并小檔案,是以我們臨時去做了 Copy-on-write sink 的實作。目前業務一直在測試使用,效果良好。

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

上方為 Copy-on-Write 的實作,其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序送出。

在 Copy-on-Write 裡面,需要根據表的資料量合理設定 Bucket 數,無需額外做小檔案合并。

  • StreamWriter 在 snapshotState 階段多并行度寫入

    1)增加 Buffer;

    2)寫入前需要判斷上次 checkpoint 已經 commit 成功;

    3)按 bucket 分組、合并,逐個 Bucket 寫入。

  • FileCommitter 單并行度順序送出

    1)table.newOverwrite()

    2)Flink.last.committed.checkpoint.id

    汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

8. 示例 - CDC 資料配置入湖

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

如上圖所示,在實際使用中,業務方可以在 DTS 平台上建立或配置分發任務即可。

執行個體類型選擇 Iceberg 表,然後選擇目标庫,表明要把哪個表的資料同步到 Iceberg 裡,然後可以選原表和目标表的字段的映射關系是什麼樣的,配置之後就可以啟動分發任務。啟動之後,會在實時計算平台 Flink 裡面送出一個實時任務,接着用 Copy-on-write sink 去實時地把資料寫到 Iceberg 表裡面。

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

9. 入湖其他實踐

實踐一:減少 empty commit

  • 問題描述:

    在上遊 Kafka 長期沒有資料的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導緻大量的空檔案和不必要的 Snapshot。

  • 解決方案(PR - 2042):

    增加配置 Flink.max-continuousempty-commits,在連續指定次數 Checkpoint 都沒有資料後才真正觸發 Commit,生成 Snapshot。

實踐二:記錄 watermark

  • 目前 Iceberg 表本身無法直接反映資料寫入的進度,離線排程難以精準觸發下遊任務。
  • 解決方案( PR - 2109 ):

    在 Commit 階段将 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直覺的反映端到端的延遲情況,同時可以用來判斷分區資料完整性,用于排程觸發下遊任務。

實踐三:删表優化

  • 删除 Iceberg 可能會很慢,導緻平台接口相應逾時。因為 Iceberg 是面向對象存儲來抽象 IO 層的,沒有快速清除目錄的方法。
  • 解決方案:

    擴充 FileIO,增加 deleteDir 方法,在 HDFS 上快速删除表資料。

10. 小檔案合并及資料清理

定期為每個表執行批處理任務(spark 3),分為以下三個步驟:

1. 定期合并新增分區的小檔案:

​ rewriteDataFilesAction.execute(); 僅合并小檔案,不會删除舊檔案。

2. 删除過期的 snapshot,清理中繼資料及資料檔案:

​ table.expireSnapshots().expireOld erThan(timestamp).commit();

3. 清理 orphan 檔案,預設清理 3 天前,且無法觸及的檔案:

​ removeOrphanFilesAction.older Than(timestamp).execute();

11. 計算引擎 – Flink

Flink 是實時平台的核心計算引擎,目前主要支援資料入湖場景,主要有以下幾個方面的特點。

  • 資料準實時入湖:

    Flink 和 Iceberg 在資料入湖方面內建度最高,Flink 社群主動擁抱資料湖技術。

  • 平台內建:

    AutoStream 引入 IcebergCatalog,支援通過 SQL 建表、入湖 AutoDTS 支援将 MySQL、SQLServer、TiDB 表配置入湖。

  • 流批一體:

    在流批一體的理念下,Flink 的優勢會逐漸展現出來。

12. 計算引擎 – Hive

Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 內建度更高,主要提供以下三個方面的功能。

  • 定期小檔案合并及 meta 資訊查詢:

    SELECT * FROM prod.db.table.history 還可檢視 snapshots, files, manifests。

  • 離線資料寫入:

    1)Insert into 2)Insert overwrite 3)Merge into

  • 分析查詢:

    主要支援日常的準實時分析查詢場景。

13. 計算引擎 – Trino/Presto

AutoBI 已經和 Presto 內建,用于報表、分析型查詢場景。

14. 踩過的坑

1. 通路 Hive Metastore 異常

問題描述:HiveConf 的構造方法的誤用,導緻 Hive 用戶端中聲明的配置被覆寫,導緻通路 Hive metastore 時異常

解決方案(PR-2075):修複 HiveConf 的構造,顯示調用 addResource 方法,確定配置不會被覆寫:hiveConf.addResource(conf);

2.Hive metastore 鎖未釋放

問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導緻上面異常。

解決方案(PR-2263):優化 HiveTableOperations#acquireLock 方法,在擷取鎖失敗的情況下顯示調用 unlock 來釋放鎖。

3. 中繼資料檔案丢失

問題描述:Iceberg 表無法通路,報 “NotFoundException Failed to open input stream for file : xxx.metadata.json”

解決方案(PR-2328):當調用 Hive metastore 更新 iceberg 表的 metadata_location 逾時後,增加檢查機制,确認中繼資料未儲存成功後再删除中繼資料檔案。

三、收益與總結

1. 總結

​ 通過對湖倉一體、流批融合的探索,我們分别做了總結。

  • 湖倉一體

    1)Iceberg 支援 Hive Metastore;

    2)總體使用上與 Hive 表類似:相同資料格式、相同的計算引擎。

  • 流批融合

    準實時場景下實作流批統一:同源、同計算、同存儲。

2. 業務收益

  • 資料時效性提升:

    入倉延遲從 2 小時以上降低到 10 分鐘以内;算法核心任務 SLA 提前 2 小時完成。

  • 準實時的分析查詢:

    結合 Spark 3 和 Trino,支援準實時的多元分析查詢。

  • 特征工程提效:

    提供準實時的樣本資料,提高模型訓練時效性。

  • CDC 資料準實時入倉:

    可以在數倉針對業務表做準實時分析查詢。

3. 架構收益 - 準實時數倉

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

上方也提到了,我們支援準實時的入倉和分析,相當于是為後續的準實時數倉建設提供了基礎的架構驗證。準實時數倉的優勢是一次開發、口徑統一、統一存儲,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現在是分鐘級的資料可見性。

但是在架構層面上,這個意義還是很大的,後續我們能看到一些希望,可以把整個原來 “T + 1” 的數倉,做成準實時的數倉,提升數倉整體的資料時效性,然後更好地支援上下遊的業務。

1. 跟進 Iceberg 版本

全面開放 V2 格式,支援 CDC 資料的 MOR 入湖。

2. 建設準實時數倉

基于 Flink 通過 Data pipeline 模式對數倉各層表全面提速。

3. 流批一體

随着 upsert 功能的逐漸完善,持續探索存儲層面流批一體。

4. 多元分析

基于 Presto/Spark3 輸出準實時多元分析。

更多 Flink 相關技術交流,可掃碼加入社群釘釘大群~

汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟6月限時活動:

0元試用

實時計算Flink版

(包年包月、10CU)即可有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐

繼續閱讀