内容簡要:
一、資料倉庫架構更新的背景
二、基于 Iceberg 的湖倉一體架構實踐
三、總結與收益
四、後續規劃
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
1. 基于 Hive 的資料倉庫的痛點
原有的資料倉庫完全基于 Hive 建造而成,主要存在三大痛點:
痛點一:不支援 ACID
1)不支援 Upsert 場景;
2)不支援 Row-level delete,資料修正成本高。
痛點二:時效性難以提升
1)資料難以做到準實時可見;
2)無法增量讀取,無法實作存儲層面的流批統一;
3)無法支援分鐘級延遲的資料分析場景。
痛點三:Table Evolution
1)寫入型 Schema,對 Schema 變更支援不好;
2)Partition Spec 變更支援不友好。
2. Iceberg 關鍵特性
Iceberg 主要有四大關鍵特性:支援 ACID 語義、增量快照機制、開放的表格式和流批接口支援。
- 支援 ACID 語義
- 不會讀到不完整的 Commit;
- 基于樂觀鎖支援并發 Commit;
- Row-level delete,支援 Upsert。
- 增量快照機制
- Commit 後資料即可見(分鐘級);
- 可回溯曆史快照。
- 開放的表格式
- 資料格式:parquet、orc、avro
- 計算引擎:Spark、Flink、Hive、Trino/Presto
- 流批接口支援
- 支援流、批寫入;
- 支援流、批讀取。
湖倉一體的意義就是說我不需要看見湖和倉,資料有着打通的中繼資料的格式,它可以自由的流動,也可以對接上層多樣化的計算生态。
——賈揚清(阿裡雲計算平台進階研究員)
1. Append 流入湖的鍊路

上圖為日志類資料入湖的鍊路,日志類資料包含用戶端日志、使用者端日志以及服務端日志。這些日志資料會實時錄入到 Kafka,然後通過 Flink 任務寫到 Iceberg 裡面,最終存儲到 HDFS。
2. Flink SQL 入湖鍊路打通
我們的 Flink SQL 入湖鍊路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對接 Iceberg Catalog 我們主要做了以下内容:
1)Meta Server 增加對 Iceberg Catalog 的支援;
2)SQL SDK 增加 Iceberg Catalog 支援。
然後在這基礎上,平台開放 Iceberg 表的管理功能,使得使用者可以自己在平台上建 SQL 的表。
3. 入湖 - 支援代理使用者
第二步是内部的實踐,對接現有預算體系、權限體系。
因為之前平台做實時作業的時候,平台都是預設為 Flink 使用者去運作的,之前存儲不涉及 HDFS 存儲,是以可能沒有什麼問題,也就沒有思考預算劃分方面的問題。
但是現在寫 Iceberg 的話,可能就會涉及一些問題。比如數倉團隊有自己的集市,資料就應該寫到他們的目錄下面,預算也是劃到他們的預算下,同時權限和離線團隊賬号的體系打通。
如上所示,這塊主要是在平台上做了代理使用者的功能,使用者可以去指定用哪個賬号去把這個資料寫到 Iceberg 裡面,實作過程主要有以下三個。
-
增加 Table 級别配置:'iceberg.user.proxy' = 'targetUser’
1)啟用 Superuser
2)團隊賬号鑒權
汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐 - 通路 HDFS 時啟用代理使用者:
汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐 -
通路 Hive Metastore 時指定代理使用者
1)參考 Spark 的相關實作:
org.apache.spark.deploy.security.HiveDelegationTokenProvider
2)動态代理 HiveMetaStoreClient,使用代理使用者通路 Hive metastore
4. Flink SQL 入湖示例
DDL + DML
5. CDC 資料入湖鍊路
如上所示,我們有一個 AutoDTS 平台,負責業務庫資料的實時接入。我們會把這些業務庫的資料接入到 Kafka 裡面,同時它還支援在平台上配置分發任務,相當于把進 Kafka 的資料分發到不同的存儲引擎裡,在這個場景下是分發到 Iceberg 裡。
6. Flink SQL CDC 入湖鍊路打通
下面是我們基于 “Flink1.11 + Iceberg 0.11” 支援 CDC 入湖所做的改動:
-
改進 Iceberg Sink:
Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并适配。
-
表管理
1)支援 Primary key(PR1978)
2)開啟 V2 版本:'iceberg.format.version' = '2'
7. CDC 資料入湖
1. 支援 Bucket
Upsert 場景下,需要確定同一條資料寫入到同一 Bucket 下,這又如何實作?
目前 Flink SQL 文法不支援聲明 bucket 分區,通過配置的方式聲明 Bucket:
'partition.bucket.source'='id', // 指定 bucket 字段
'partition.bucket.num'='10', // 指定 bucket 數量
2. Copy-on-write sink
做 Copy-on-Write 的原因是原本社群的 Merge-on-Read 不支援合并小檔案,是以我們臨時去做了 Copy-on-write sink 的實作。目前業務一直在測試使用,效果良好。
上方為 Copy-on-Write 的實作,其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序送出。
在 Copy-on-Write 裡面,需要根據表的資料量合理設定 Bucket 數,無需額外做小檔案合并。
-
StreamWriter 在 snapshotState 階段多并行度寫入
1)增加 Buffer;
2)寫入前需要判斷上次 checkpoint 已經 commit 成功;
3)按 bucket 分組、合并,逐個 Bucket 寫入。
-
FileCommitter 單并行度順序送出
1)table.newOverwrite()
2)Flink.last.committed.checkpoint.id
汽車之家:基于 Flink + Iceberg 的湖倉一體架構實踐
8. 示例 - CDC 資料配置入湖
如上圖所示,在實際使用中,業務方可以在 DTS 平台上建立或配置分發任務即可。
執行個體類型選擇 Iceberg 表,然後選擇目标庫,表明要把哪個表的資料同步到 Iceberg 裡,然後可以選原表和目标表的字段的映射關系是什麼樣的,配置之後就可以啟動分發任務。啟動之後,會在實時計算平台 Flink 裡面送出一個實時任務,接着用 Copy-on-write sink 去實時地把資料寫到 Iceberg 表裡面。
9. 入湖其他實踐
實踐一:減少 empty commit
-
問題描述:
在上遊 Kafka 長期沒有資料的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導緻大量的空檔案和不必要的 Snapshot。
-
解決方案(PR - 2042):
增加配置 Flink.max-continuousempty-commits,在連續指定次數 Checkpoint 都沒有資料後才真正觸發 Commit,生成 Snapshot。
實踐二:記錄 watermark
- 目前 Iceberg 表本身無法直接反映資料寫入的進度,離線排程難以精準觸發下遊任務。
-
解決方案( PR - 2109 ):
在 Commit 階段将 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直覺的反映端到端的延遲情況,同時可以用來判斷分區資料完整性,用于排程觸發下遊任務。
實踐三:删表優化
- 删除 Iceberg 可能會很慢,導緻平台接口相應逾時。因為 Iceberg 是面向對象存儲來抽象 IO 層的,沒有快速清除目錄的方法。
-
解決方案:
擴充 FileIO,增加 deleteDir 方法,在 HDFS 上快速删除表資料。
10. 小檔案合并及資料清理
定期為每個表執行批處理任務(spark 3),分為以下三個步驟:
1. 定期合并新增分區的小檔案:
rewriteDataFilesAction.execute(); 僅合并小檔案,不會删除舊檔案。
2. 删除過期的 snapshot,清理中繼資料及資料檔案:
table.expireSnapshots().expireOld erThan(timestamp).commit();
3. 清理 orphan 檔案,預設清理 3 天前,且無法觸及的檔案:
removeOrphanFilesAction.older Than(timestamp).execute();
11. 計算引擎 – Flink
Flink 是實時平台的核心計算引擎,目前主要支援資料入湖場景,主要有以下幾個方面的特點。
-
資料準實時入湖:
Flink 和 Iceberg 在資料入湖方面內建度最高,Flink 社群主動擁抱資料湖技術。
-
平台內建:
AutoStream 引入 IcebergCatalog,支援通過 SQL 建表、入湖 AutoDTS 支援将 MySQL、SQLServer、TiDB 表配置入湖。
-
流批一體:
在流批一體的理念下,Flink 的優勢會逐漸展現出來。
12. 計算引擎 – Hive
Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 內建度更高,主要提供以下三個方面的功能。
-
定期小檔案合并及 meta 資訊查詢:
SELECT * FROM prod.db.table.history 還可檢視 snapshots, files, manifests。
-
離線資料寫入:
1)Insert into 2)Insert overwrite 3)Merge into
-
分析查詢:
主要支援日常的準實時分析查詢場景。
13. 計算引擎 – Trino/Presto
AutoBI 已經和 Presto 內建,用于報表、分析型查詢場景。
-
Trino
1)直接将 Iceberg 作為報表資料源
2)需要增加中繼資料緩存機制:
https://github.com/trinodb/trino/issues/7551 -
Presto
社群內建中:
https://github.com/prestodb/presto/pull/15836
14. 踩過的坑
1. 通路 Hive Metastore 異常
問題描述:HiveConf 的構造方法的誤用,導緻 Hive 用戶端中聲明的配置被覆寫,導緻通路 Hive metastore 時異常
解決方案(PR-2075):修複 HiveConf 的構造,顯示調用 addResource 方法,確定配置不會被覆寫:hiveConf.addResource(conf);
2.Hive metastore 鎖未釋放
問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導緻上面異常。
解決方案(PR-2263):優化 HiveTableOperations#acquireLock 方法,在擷取鎖失敗的情況下顯示調用 unlock 來釋放鎖。
3. 中繼資料檔案丢失
問題描述:Iceberg 表無法通路,報 “NotFoundException Failed to open input stream for file : xxx.metadata.json”
解決方案(PR-2328):當調用 Hive metastore 更新 iceberg 表的 metadata_location 逾時後,增加檢查機制,确認中繼資料未儲存成功後再删除中繼資料檔案。
三、收益與總結
1. 總結
通過對湖倉一體、流批融合的探索,我們分别做了總結。
-
湖倉一體
1)Iceberg 支援 Hive Metastore;
2)總體使用上與 Hive 表類似:相同資料格式、相同的計算引擎。
-
流批融合
準實時場景下實作流批統一:同源、同計算、同存儲。
2. 業務收益
-
資料時效性提升:
入倉延遲從 2 小時以上降低到 10 分鐘以内;算法核心任務 SLA 提前 2 小時完成。
-
準實時的分析查詢:
結合 Spark 3 和 Trino,支援準實時的多元分析查詢。
-
特征工程提效:
提供準實時的樣本資料,提高模型訓練時效性。
-
CDC 資料準實時入倉:
可以在數倉針對業務表做準實時分析查詢。
3. 架構收益 - 準實時數倉
上方也提到了,我們支援準實時的入倉和分析,相當于是為後續的準實時數倉建設提供了基礎的架構驗證。準實時數倉的優勢是一次開發、口徑統一、統一存儲,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現在是分鐘級的資料可見性。
但是在架構層面上,這個意義還是很大的,後續我們能看到一些希望,可以把整個原來 “T + 1” 的數倉,做成準實時的數倉,提升數倉整體的資料時效性,然後更好地支援上下遊的業務。
1. 跟進 Iceberg 版本
全面開放 V2 格式,支援 CDC 資料的 MOR 入湖。
2. 建設準實時數倉
基于 Flink 通過 Data pipeline 模式對數倉各層表全面提速。
3. 流批一體
随着 upsert 功能的逐漸完善,持續探索存儲層面流批一體。
4. 多元分析
基于 Presto/Spark3 輸出準實時多元分析。
更多 Flink 相關技術交流,可掃碼加入社群釘釘大群~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟6月限時活動:
0元試用
實時計算Flink版(包年包月、10CU)即可有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc