天天看點

深度內建 Flink: Apache Iceberg 0.11.0 最新功能解讀

在 2021 年 1 月 27 日,Apache Iceberg 釋出了 0.11.0 版本[1]。在這個版本中,實作了以下核心功能:

1、Apache Iceberg 在 Core API 層面支援了 partition 的變更;同時還在 Iceberg Format v2 之上新增了 SortOrder 規範,主要用于将那些散列度較高的 column 聚集在少數幾個檔案内,這樣可以大量減少小檔案的數量。同時提高讀取的效率,因為資料通過 sort 寫入後,檔案級别和 Page 級别的 min-max 範圍将更小,有助于高效的資料過濾。

2、在 Flink 和 Iceberg 的內建方面,社群實作了以下目标:

  • 實作了 Flink Streaming Reader,意味着我們可以通過 Flink 流作業增量地去拉取 Apache Iceberg 中新增資料。對 Apache Iceberg 這樣流批統一的存儲層來說,Apache Flink 是真正意義上第一個實作了流批讀寫 Iceberg 的計算引擎,這也标志着 Apache Flink 和 Apache Iceberg 在共同打造流批統一的資料湖架構上開啟了新的篇章。
  • 實作了 Flink Streaming/Batch Reader 的 limit pushdown 和 filter pushdown。
  • 實作了 CDC 和 Upsert 事件通過 flink 計算引擎寫入 Apache Iceberg,并在中等資料規模上完成了正确性驗證。
  • 在 Flink Iceberg Sink 中支援 write.distribution-mode=hash 的方式寫入資料,這可以從生産源頭上大量減少小檔案。

3、在 Spark3 和 Iceberg 的內建方面,社群支援了大量高階 SQL:

  • MERGE INTO
  • DELETE FROM
  • ALTER TABLE ... ADD/DROP PARTITION
  • ALTER TABLE ... WRITE ORDERED BY
  • 通過 Call 方式來執行更多的資料管理操作,例如合并小檔案、清理過期檔案等。

4、在周邊生态內建方面,社群實作了以下目标:

  • 引入 AWS module,完成和 AWS S3[2] 以及 Glue Catalog[3] 等雲服務的內建;
  • 內建流行的開源 catalog 服務 nessie[4]。

在接下來的内容裡,我将說明 Apache Iceberg 0.11.0 在 Apache Flink 內建方面做的一些具體工作。

Apache Flink流式讀取

在 Apache Iceberg 0.10.0 版本中,我們已經在 Flink SQL 層面支援了:

  1. 流作業寫入 Apache Iceberg 表;
  2. 批作業寫入 Apache Iceberg 表;
  3. 批作業讀取 Apache Iceberg 表;

在最新的 Apache Iceberg 0.11.0 版本中,我們又成功內建了 Flink 流作業讀取 Apache Iceberg 表。有了這個功能,可以很友善地實作不同 Iceberg 表之間的資料流轉和 ETL。例如我們有一個原始表 A,需要把表 A 通過一些資料處理或者打寬,處理成一個表 B,那麼這個場景是很适合用 Apache Iceberg 的 Streaming Reader 來實作的。

深度內建 Flink: Apache Iceberg 0.11.0 最新功能解讀

除此之外,Netflix 也提出他們在采用 Flink Streaming Reader 來實作曆史資料的 backfill 和 boostrap。當然,這需要未來 iceberg 內建到 FLIP-27,目前 Netflix 提供了他們對這塊工作的一些實踐經驗[5]和設計工作[6],大家感興趣可以參考一下。

目前,對這個功能我們提供了 Flink SQL 和 DataStream API 兩種使用方式(推薦采用 Flink SQL)。您可以通過閱讀文檔[7]來啟動 Flink SQL 用戶端,然後通過如下方式來啟動流作業通路 Apache Iceberg 的增量資料:

-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;           

Flink Source 的 Limit Pushdown 和 Filter Pushdown

在 Flink 的 Batch Source 和 Streaming Source 中,我們對接了 Limit 操作和 Filter 操作跟 Iceberg 表的下推實作。這意味着,在讀取 Apache Iceberg 表時,碰到這樣的 SQL:

SELECT * FROM sample LIMIT 10;           

我們可以在存儲層面就完成資料過濾,而不需要把資料從存儲層面讀取出來,再丢給計算引擎。進而大大提高資料的通路效率。

Filter 的下推也是類似,目前我們支援了如下 Filter 的下推,幾乎包含了所有常見 filter 的下推操作:

SELECT * FROM sample WHERE data = 'a';
SELECT * FROM sample WHERE data != 'a';
SELECT * FROM sample WHERE data >= 'a';
SELECT * FROM sample WHERE data <= 'a';
SELECT * FROM sample WHERE data < 'a';
SELECT * FROM sample WHERE data > 'a';
SELECT * FROM sample WHERE data = 'a' AND id = 1;
SELECT * FROM sample WHERE data = 'a' OR id = 1;
SELECT * FROM sample WHERE data IS NULL;
SELECT * FROM sample WHERE NOT (id = 1);
SELECT * FROM sample WHERE data LIKE 'aaa%';           

對 CDC(例如 MySQL Binlog)和 Upsert 事件的支援

這個功能是 Apache Flink 社群使用者呼聲特别高的一個功能,主要來自兩個核心場景的需求:

  1. 使用者希望把來自關系型資料庫的 binlog 導入到 Apache Iceberg 資料湖中,提供近實時的資料分析能力。
  2. 希望把 Flink 流作業 AGG 産生的 upsert stream 導入到 Apache Iceberg 資料湖中,進而借助 Apache Iceberg 的存儲能力和 Apache Flink 的分析能力,提供近實時的資料報表。

通常來說,我們能選的開源方案各有不足:選擇采用 Hive MR 則隻能提供 T+1 的資料時效性;采用 Apache Kudu 則必須面臨跟 HDFS 和雲端對象存儲脫節的尴尬;選擇 HBase 則面臨行存導緻分析能力不足的問題;選擇 Spark+delta 則無法充分利用 Apache Flink 在流計算領域的優勢。那麼,在 Apache Iceberg 的實作中,這些問題将有望解決。

我們把 flink+iceberg 對 CDC/Upsert 工作的內建大緻分成了兩個階段:

  • 第一階段,是指 Flink 可以順利地把 CDC 和 Upsert 的資料成功寫入到 Apache Iceberg,并能讀取到一個正确的結果;
  • 第二階段,是指 Flink+Iceberg 能順利通過較大資料量的穩定性測試和性能測試,保證整條鍊路的穩定性和性能,進而達到可以上生産的水準。

那麼,目前我們在 0.11.0 版本中,已經實作了第一階段的目标,流作業已經能夠成功地将 CDC/Upsert 資料寫入到 Apache Iceberg 中,國内的小夥伴例如汽車之家和 B 站已經幫忙完成中等資料量的正确性驗證。

在未來的 Apache Iceberg 0.12.0 版本中,我們規劃了一系列的性能和穩定性相關事情,0.12.0 版本将會是 Iceberg CDC/Upsert 功能達到 Production Ready 的一個标志性版本。

支援 write.distribution-mode=hash 方式寫入 Apache Iceberg

在 Flink 流作業寫檔案系統的資料檔案時,非常容易碰到小檔案的問題。這是因為如果 source 端的資料,不經過任何 shuffle 或者 cluster,就寫入到 partition,很容易導緻每個 Task 寫了大量的 Partition 和 Bucket。這樣對一個 Partition 來說,就存在多個 Task 寫入,每個 Task 至少産生一個檔案。而在 Apache Iceberg 這種資料湖架構中,Flink 的每一次 checkpoint,都将 Roll over file writer 以便送出 txn,那麼随着分鐘級别的 checkpoint 送出,一定會産生大量的小檔案。

目前在 Apache Iceberg 中,将提供 3 中方式來解決小檔案問題:

1、在 Iceberg 表中設定 write.distribution-mode=hash 屬性,例如:

CREATE TABLE sample (
    id BIGINT,
    data STRING
) PARTITIONED BY (data) WITH (
    'write.distribution-mode'='hash'
);           

這樣可以保證每一條記錄按照 partition key 做 shuffle 之後再寫入,每一個 Partition 最多由一個 Task 來負責寫入,大大地減少了小檔案的産生。但是,這很容易産生另外一個問題,就是資料傾斜的問題。很多業務表都是按照時間字段來做分區的,而産生的新資料都是按照時間寫入的,容易導緻新資料都寫入同一個 partition,造成寫入資料熱點。目前我們推薦的做法是,在 partition 下面采用 hash 的方式設定 bucket,那麼每一個 partition 的資料将均勻地落到每個 bucket 内,每一個 bucket 最多隻會由一個 task 來寫,既解決了小檔案問題,又解決了資料熱點問題。

在 Flink 1.11 版本暫時不支援通過 SQL 的方式建立 bucket,但我們可以通過 Java API 的方式将上述按照 data 字段 partition 之後的表添加 bucket。調用方式如下:

table.updateSpec()
       .addField(Expressions.bucket("id", 32))
       .commit();           

2、定期對 Apache Iceberg 表執行 Major Compaction 來合并 Apache iceberg 表中的小檔案。這個作業目前是一個 Flink 的批作業,提供 Java API 的方式來送出作業,使用姿勢可以參考文檔[8]。

3、在每個 Flink Sink 流作業之後,外挂算子用來實作小檔案的自動合并。這個功能目前暫未 merge 到社群版本,由于涉及到 format v2 的 compaction 的一些讨論,我們會在 0.12.0 版本中釋出該功能。

總結

自 Apache Flink 接入 Apache Iceberg 以來,社群已經成功地釋出了兩個版本。在這兩個版本中,我們已經成功地實作 Flink+Iceberg 的流批讀寫能力。

到目前為止,Flink+Iceberg 在國内外已經有不少成功的上線案例:

  • 騰訊内部每天都有大量的日志資料通過 Flink 清洗處理後導入到 Iceberg,最大的表日新增幾十 TB;
  • Netflix 則将公司内幾乎所有的使用者行為資料通過 Flink 流計算導入到 Iceberg,最終存儲在 AWS S3 之上,相比 HDFS 的方式, Flink+Iceberg 幫助他們公司節省大量的存儲成本;
  • 同程藝龍也在 Flink+Iceberg 之上做了大量探索,之前幾乎所有的分析資料都存儲在 Hive 上,鑒于 Hive 在 ACID 和曆史回溯等方面能力不足,他們調研了 Iceberg,發現 Iceberg 非常适合替換他們的 Hive 存儲格式,又由于上層計算生态的良好對接,幾乎所有的曆史計算作業都不需要做改動,就能友善地切換 Hive 表到 Iceberg 之上。到目前為止同程藝龍已經完成了幾十張 Hive 表到 Iceberg 表的遷移;
  • 汽車之家也是成功在生産環境大量替換 Hive 表為 Iceberg 表的公司之一,同時他們也是最早采用社群版 Iceberg 做 CDC 和 Upsert 資料分析 PoC 的公司,也非常期待未來 0.12.0 對 CDC 和 Upsert 場景的更多優化。
深度內建 Flink: Apache Iceberg 0.11.0 最新功能解讀

在未來的 Apache Iceberg 0.12.0 版本中,我們規劃了上圖的核心功能。本質上我們将實作 Flink+Iceberg 對 CDC 及 Upsert 場景的更好支援,将在穩定性、性能、易用性三個方面做更多的優化工作。

最後,我想聊一下 Apache Iceberg 在計算生态方面的現狀。

随着 Apache Iceberg 0.11.0 新版的釋出,Apache Iceberg 作為一個統一通用的資料湖 Table Format,在生态內建方面的優勢愈發明顯。由于在 Table Format 層面對計算引擎無偏袒,計算引擎的內建呈現出百花齊放的姿态,大資料生态内幾乎所有主流計算引擎都跟 Iceberg 有着不同程度的對接:

  1. Netflix、騰訊和 Apple 幾家公司的貢獻者主力推動 Spark+Iceberg 的內建,騰訊、Netflix 和 Apple 在 Apache Spark 社群有着多位 Spark PMC 和 Spark Committer,在 Spark 社群和 Iceberg 社群的影響力有目共睹。我個人樂觀地判斷,Apache Iceberg 和 Spark 的內建體驗,未來有望比肩 Databricks delta 的商業版體驗,大家可以期待下。
  2. 阿裡巴巴 Flink 團隊、Netflix 以及國内外龐大的 Flink 使用者群在不斷地推動 Flink+Iceberg 的內建,不再贅述;
  3. AWS Presto 團隊以及 Trino 團隊則在不斷推動着 Presto 和 Iceberg 的內建,AWS Presto 團隊已經明确将 Iceberg 選型為他們的資料湖 table format。同時,也可以非常明顯地看到,AWS 團隊在 Iceberg 和 S3 以及 Glue 生态打通方面做的大量工作,Apache Iceberg 已經成為 AWS 資料湖生态中相當重要的一環。
  4. Cloudera 已經明确地選型 Apache Iceberg 來建構他們的商業版資料湖。使用過 Hadoop 的同學一定不會對這家公司陌生,沒錯,這家公司就是 Hadoop 商業發行版做的最為出色的公司之一。未來,他們将基于 Apache Iceberg 推出公有雲服務,将給使用者帶來完善的 Flink、Spark、Hive、Impala 資料湖內建體驗。這裡重點說一下 Apache Impala,Cloudera 在互動式分析場景下非常倚重自家開源的 Apache Impala(事實上,在大資料基準測試下 Impala 的性能表現的确要比 Presto 更好),Apache Iceberg 對存儲層較為完美的抽象和對多樣化計算引擎的包容,是成功打動 Cloudera 選型 Apache Iceberg 最核心的理由之一。

更多關于 Flink 資料湖的讨論,請掃描下方釘群二維碼,加入資料湖技術交流釘釘群。我們會定期在群裡釋出 Apache Iceberg/Hudi 和 Flink 內建的最新進展,我們也非常歡迎大家積極讨論相關話題。

另外阿裡雲 Flink 團隊也一直在尋求大資料計算和資料湖存儲方向的人才,這裡既有豐富的應用場景等你來挑戰,又有相對靈活的空間參與開源社群提升個人影響力。感興趣的同學可以直接聯系:[email protected]

深度內建 Flink: Apache Iceberg 0.11.0 最新功能解讀

參考連結:

[1]

https://lists.apache.org/x/thread.html/rfa2be6bb85c0cae38ccedcf5c2d8fbfe192bdfccd58ee500e44e665e@%3Cdev.iceberg.apache.org%3E

[2]

https://aws.amazon.com/cn/s3/

[3]

https://aws.amazon.com/cn/glue/

[4]

https://projectnessie.org/

[5]

https://www.youtube.com/watch?v=rtz3p_iijP8&ab_channel=NetflixData

[6]

https://docs.google.com/document/d/1q6xaBxUPFwYsW9aXWxYUh7die6O7rDeAPFQcTAMQ0GM/edit?ts=601316b0

[7]

https://github.com/apache/iceberg/blob/master/site/docs/flink.md#preparation

[8]

https://github.com/apache/iceberg/blob/master/site/docs/flink.md#rewrite-files-action

作者簡介:

胡争(子毅),Apache Iceberg Committer,Apache HBase PMC 成員,阿裡巴巴技術專家。目前主要負責 Flink 資料湖方案的設計和開發工作,Apache Iceberg 及 Apache Flink 項目的長期活躍貢獻者,《HBase 原理與實踐》作者。

深度內建 Flink: Apache Iceberg 0.11.0 最新功能解讀