天天看點

Flink內建Iceberg在同程藝龍的實踐

本文由同城藝龍大資料開發工程師張軍分享,主要介紹同城藝龍 Flink 內建 Iiceberg 的生産實踐。内容包括:
  1. 背景及痛點
  2. Flink + Iceberg 的落地
  3. Iceberg 優化實踐
  4. 後續工作
  5. 收益及總結

一、背景及痛點

業務背景

同程藝龍是一個提供機票、住宿、交通等服務的線上旅遊服務平台,目前我所在的部門屬于公司的研發部門,主要職責是為公司内其他業務部門提供一些基礎服務,我們的大資料系統主要承接的業務是部門内的一些大資料相關的資料統計、分析工作等。資料來源有網關日志資料、伺服器監控資料、K8s 容器的相關日志資料,App 的打點日志, MySQL 的 binlog 日志等。我們主要的大資料任務是基于上述日志建構實時報表,提供基于 Presto 的報表展示和即時查詢服務,同時也會基于 Flink 開發一些實時、批處理任務,為業務方提供準确及時的資料支撐。

原架構方案

由于我們所有的原始資料都是存儲在 Kafka 的,是以原來的技術架構就是首先是 Flink 任務消費 Kafka 的資料,經過 Flink SQL 或者 Flink jar 的各種處理之後實時寫入 Hive,其中絕大部分任務都是 Flink SQL 任務,因為我認為 SQL 開發相對代碼要簡單的多,并且維護友善、好了解,是以能用 SQL 寫的都盡量用 SQL 來寫。

送出 Flink 的平台使用的是 Zeppelin,其中送出 Flink SQL 任務是 Zeppelin 自帶的功能,送出 jar 包任務是我自己基于 Application 模式開發的 Zeppelin 插件。

對于落地到 Hive 的資料,使用開源的報表系統 metabase (底層使用 Presto) 提供實時報表展示、定時發送郵件報表,以及自定義 SQL 查詢服務。由于業務對資料的實時性要求比較高,希望資料能盡快的展示出來,是以我們很多的 Flink 流式任務的 checkpoint 設定為 1 分鐘,資料格式采用的是 orc 格式。

痛點

由于采用的是列式存儲格式 ORC,無法像行式存儲格式那樣進行追加操作,是以不可避免的産生了一個大資料領域非常常見且非常棘手的問題,即 HDFS 小檔案問題。

開始的時候我們的小檔案解決方案是自己寫的一個小檔案壓縮工具,定期去合并,我們的 Hive 分區一般都是天級别的,是以這個工具的原理就是每天淩晨啟動一個定時任務去壓縮昨天的資料,首先把昨天的資料寫入一個臨時檔案夾,壓縮完,和原來的資料進行記錄數的比對檢驗,資料條數一緻之後,用壓縮後的資料覆寫原來的資料,但是由于無法保證事務,是以出現了很多問題:

  • 壓縮的同時由于延遲資料的到來導緻昨天的 Hive 分區又有資料寫入了,檢驗就會失敗,導緻合并小檔案失敗。
  • 替換舊資料的操作是沒有事務保證的,如果替換的過程中舊分區有新的資料寫入,就會覆寫新寫入的資料,造成資料丢失。
  • 沒有事務的支援,無法實時合并目前分區的資料,隻能合并壓縮前一個分區的,最新的分區資料仍然有小檔案的問題,導緻最新資料查詢性能提高不了。

二、Flink+Iceberg 的落地

Iceberg 技術調研

是以基于以上的 HDFS 小檔案、查詢慢等問題,結合我們的現狀,我調研了目前市面上的資料湖技術:Delta、Apache Iceberg 和 Apache Hudi,考慮了目前資料湖架構支援的功能和以後的社群規劃,最終我們是選擇了 Iceberg,其中考慮的原因有以下幾方面:

■ Iceberg 深度內建 Flink

前面講到,我們的絕大部分任務都是 Flink 任務,包括批處理任務和流處理任務,目前這三個資料湖架構,Iceberg 是內建 Flink 做的最完善的,如果采用 Iceberg 替代 Hive 之後,遷移的成本非常小,對使用者幾乎是無感覺的,

比如我們原來的 SQL 是這樣的:

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table           

遷移到 Iceberg 以後,隻需要修改 catalog 就行。

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table           

Presto 查詢也是和這個類似,隻需要修改 catalog 就行了。

■Iceberg 的設計架構使得查詢更快

Flink內建Iceberg在同程藝龍的實踐

在 Iceberg 的設計架構中,manifest 檔案存儲了分區相關資訊、data files 的相關統計資訊(max/min)等,去查詢一些大的分區的資料,就可以直接定位到所要的資料,而不是像 Hive 一樣去 list 整個 HDFS 檔案夾,時間複雜度從 O(n) 降到了 O(1),使得一些大的查詢速度有了明顯的提升,在 Iceberg PMC Chair Ryan Blue 的演講中,我們看到命中 filter 的任務執行時間從 61.5 小時降到了 22 分鐘。

■使用 Flink SQL 将 CDC 資料寫入 Iceberg

Flink CDC 提供了直接讀取 MySQL binlog 的方式,相對以前需要使用 canal 讀取 binlog 寫入 Iceberg,然後再去消費 Iceberg 資料。少了兩個元件的維護,鍊路減少了,節省了維護的成本和出錯的機率。并且可以實作導入全量資料和增量資料的完美對接,是以使用 Flink SQL 将 MySQL binlog 資料導入 Iceberg 來做 MySQL->Iceberg 的導入将會是一件非常有意義的事情。

此外對于我們最初的壓縮小檔案的需求,雖然 Iceberg 目前還無法實作自動壓縮,但是它提供了一個批處理任務,已經能滿足我們的需求。

■Hive 表遷移 Iceberg 表

遷移準備工作

目前我們的所有資料都是存儲在 Hive 表的,在驗證完 Iceberg 之後,我們決定将 Hive 的資料遷移到 Iceberg,是以我寫了一個工具,可以使用 Hive 的資料,然後建立一個 Iceberg 表,為其建立相應的中繼資料,但是測試的時候發現,如果采用這種方式,需要把寫入 Hive 的程式停止,因為如果 Iceberg 和 Hive 使用同一個資料檔案,而壓縮程式會不斷地壓縮 Iceberg 表的小檔案,壓縮完之後,不會馬上删除舊資料,是以 Hive 表就會查到雙份的資料,故我們采用雙寫的政策,原來寫入 Hive 的程式不動,新啟動一套程式寫入 Iceberg,這樣能對 Iceberg 表觀察一段時間。還能和原來 Hive 中的資料進行比對,來驗證程式的正确性。

經過一段時間觀察,每天将近幾十億條資料、壓縮後幾個 T 大小的 Hive 表和 Iceberg 表,一條資料也不差。是以在最終對比資料沒有問題之後,把 Hive 表停止寫入,使用新的 Iceberg 表。

遷移工具

我将這個 Hive 表遷移 Iceberg 表的工具做成了一個基于 Flink batch job 的 Iceberg Action,送出了社群,不過目前還沒合并:

https://github.com/apache/iceberg/pull/2217

。這個功能的思路是使用 Hive 原始的資料不動,然後建立一個 Iceberg table,再為這個新的 Iceberg table 生成對應的中繼資料,大家有需要的話可以先看看。

此外,Iceberg 社群,還有一個把現有的資料遷移到已存在的 Iceberg table 的工具,類似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存儲過程做的,大家也可以關注下:

https://github.com/apache/iceberg/pull/2210

三、Iceberg 優化實踐

壓縮小檔案

目前壓縮小檔案是采用的一個額外批任務來進行的,Iceberg 提供了一個 Spark 版本的 action,我在做功能測試的時候發現了一些問題,此外我對 Spark 也不是非常熟悉,擔心出了問題不好排查,是以參照 Spark 版本的自己實作了一個 Flink 版本,并修複了一些 bug,進行了一些功能的優化。

由于我們的 Iceberg 的中繼資料都是存儲在 Hive 中的,也就是我們使用了 HiveCatalog,是以壓縮程式的邏輯是把 Hive 中所有的 Iceberg 表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區表還是非分區表,都進行全表的壓縮,這樣做是為了處理某些使用 eventtime 的 Flink 任務。如果有延遲的資料的到來,就會把資料寫入以前的分區,如果不是全表壓縮隻壓縮當天分區的話,新寫入的其他天的資料就不會被壓縮。

之是以沒有開啟定時任務來壓縮,是因為比如定時五分鐘壓縮一個表,如果五分鐘之内這個壓縮任務沒完成,沒有送出新的 snapshot,下一個定時任務又開啟了,就會把上一個沒有完成的壓縮任務中的資料重新壓縮一次,是以每個表依次壓縮的政策可以保證某一時刻一個表隻有一個任務在壓縮。

代碼示例參考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();           

目前系統運作穩定,已經完成了幾萬次任務的壓縮。

Flink內建Iceberg在同程藝龍的實踐

注意:

不過目前對于新釋出的 Iceberg 0.11 來說,還有一個已知的 bug,即當壓縮前的檔案大小大于要壓縮的大小(targetSizeInBytes)時,會造成資料丢失,其實這個問題我在最開始測試小檔案壓縮的時候就發現了,并且提了一個 pr,我的政策是大于目标檔案的資料檔案不參與壓縮,不過這個 pr 沒有合并到 0.11 版本中,後來社群另外一個兄弟也發現了相同的問題,送出了一個 pr(

https://github.com/apache/iceberg/pull/2196

) ,政策是将這個大檔案拆分到目标檔案大小,目前已經合并到 master,會在下一個 bug fix 版本 0.11.1 中釋出。

查詢優化

■ 批處理定時任務

目前對于定時排程中的批處理任務,Flink 的 SQL 用戶端還沒 Hive 那樣做的很完善,比如執行 hive-f 來執行一個檔案。而且不同的任務需要不同的資源,并行度等。

是以我自己封裝了一個 Flink 程式,通過調用這個程式來進行處理,讀取一個指定檔案裡面的 SQL,來送出批任務。在指令行控制任務的資源和并行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql           

■ 優化

批任務的查詢這塊,我做了一些優化工作,比如 limit 下推,filter 下推,查詢并行度推斷等,可以大大提高查詢的速度,這些優化都已經推回給社群,并且在 Iceberg 0.11 版本中釋出。

運維管理

■ 清理 orphan 檔案

  1. 定時任務删除

在使用 Iceberg 的過程中,有時候會有這樣的情況,我送出了一個 Flink 任務,由于各種原因,把它停了,這個時候 Iceberg 還沒送出相應的快照。此外由于一些異常導緻程式失敗,會産生一些不在 Iceberg 中繼資料裡面的孤立的資料檔案,這些檔案對 Iceberg 來說是不可達的,也是沒用的。是以我們需要像 jvm 的垃圾回收一樣來清理這些檔案。

目前 Iceberg 提供了一個 Spark 版本的 action 來處理這些沒用的檔案,我們采取的政策和壓縮小檔案一樣,擷取 Hive 中的所有的 Iceberg 表。每隔一個小時執行一次定時任務來删除這些沒用的檔案。

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();           
  1. 踩坑

我們在程式運作過程中出現了正常的資料檔案被删除的問題,經過調研,由于快照保留設定是一小時,這個清理程式清理時間也是設定一個小時,通過日志發現是這個清理程式删除了正常的資料。查了查代碼,應該是設定了一樣的時間,在清理孤立檔案的時候,有其他程式正在讀取要 expired 的 snapshot,導緻删除了正常的資料。最後把這個清理程式的清理時間改成預設的三天,沒有再出現删除資料檔案的問題。

當然,為了保險起見,我們可以覆寫原來的删除檔案的方法,改成将檔案到一個備份檔案夾,檢查沒有問題之後,手工删除。

■ 快照過期處理

我們的快照過期政策,是和壓縮小檔案的批處理任務寫在一起的,壓縮完小檔案之後,進行表的快照過期處理,目前保留的時間是一個小時。這是因為對于有一些比較大的表,分區比較多,而且 checkpoint 比較短,如果保留的快照過長的話,還是會保留過多小檔案,我們暫時沒有查詢曆史快照的需求,是以我将快照的保留時間設定了一個小時。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();           

■ 資料管理

寫入了資料之後,當想檢視相應的快照有多少資料檔案時,直接查詢 Spark 無法知道哪個是有用的,哪個是沒用的。是以需要有對應的管理工具。目前 Flink 這塊還不太成熟,我們可以使用 Spark3 提供的工具來檢視。

  1. DDL

目前 create table 這些操作我們是通過 Flink SQL Client 來做的。其他相關的 DDL 的操作可以使用 Spark 來做:

https://iceberg.apache.org/spark/#ddl-commands
  1. DML

一些相關的資料的操作,比如删除資料等可以通過 MySQL 來實作,Presto 目前隻支援分區級别的删除功能。

  1. show partitions & show create table

在我們操作 Hive 的時候,有一些很常用的操作,比如 show partitions、 show create table 等,這些目前 Flink 還沒有支援,是以在操作 Iceberg 的時候就很不友善,我們自己基于 Flink 1.12 做 了修改,不過目前還沒有完全送出到社群,後續有時間會送出到 Flink 和 Iceberg 社群。

四、後續工作

  • Flink SQL 接入 CDC 資料到 Iceberg

目前在我們内部的版本中,我已經測試通過可以使用 Flink SQL 将 CDC 資料(比如 MySQL binlog)寫入 Iceberg,社群的版本中實作該功能還需要做一些工作,我也送出了一些相關的 PR 來推進這個工作。

  • 使用 SQL 進行删除和更新

對于 copy-on-write 表,我們可以使用 Spark SQL 來進行行級的删除和更新。具體的支援的文法可以參考源碼中的測試類:

org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,這些功能我在測試環境測試是可以的,但是還沒有來得及更新到生産。

  • 使用 Flink SQL 進行 streaming read

在工作中會有一些這樣的場景,由于資料比較大,Iceberg 的資料隻存了較短的時間,如果很不幸因為程式寫錯了等原因,想從更早的時間來消費就無能為力了。

當引入了 Iceberg 的 streaming read 之後,這些問題就可以解決了,因為 Iceberg 存儲了所有的資料,當然這裡有一個前提就是對于資料沒有要求特别精确,比如達到秒級别,因為目前 Flink 寫入 Iceberg 的事務送出是基于 Flink Checkpoint 間隔的。

五、收益及總結

經過對 Iceberg 大概一個季度的調研,測試,優化和 bug 修複,我們将現有的 Hive 表都遷移到了 Iceberg,完美解決了原來的所有的痛點問題,目前系統穩定運作,而且相對 Hive 得到了很多的收益:

  • Flink 寫入的資源減少

舉一個例子,預設配置下,原來一個 flink 讀取 kafka 寫入 hive 的任務,需要60個并行度才不會讓 Kafka 産生積壓。改成寫入 iceberg 之後,隻需要20個并行度就夠了。

  • 查詢速度變快

前面我們講到 Iceberg 查詢的時候不會像 Hive 一樣去 list 整個檔案夾來擷取分區資料,而是先從 manifest 檔案中擷取相關資料,查詢的性能得到了顯著的提升,一些大的報表的查詢速度從 50 秒提高到 30 秒。

  • 并發讀寫

由于 Iceberg 的事務支援,我們可以實作對一個表進行并發讀寫,Flink 流式資料實時入湖,壓縮程式同時壓縮小檔案,清理過期檔案和快照的程式同時清理無用的檔案,這樣就能更及時的提供資料,做到分鐘級的延遲,查詢最新分區資料的速度大大加快了,并且由于 Iceberg 的 ACID 特性可以保證資料的準确性。

  • time travel

可以回溯查詢以前某一時刻的資料。

總結一下,我們目前可以實作使用 Flink SQL 對 Iceberg 進行批、流的讀寫,并可以對小檔案進行實時的壓縮,使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,後續可以使用 Flink SQL 将 CDC 的資料寫入 Iceberg。目前對 Iceberg 的所有的優化和 bug fix,我已經貢獻給社群。由于筆者水準有限,有時候也難免有錯誤,還請大家不吝賜教。

作者介紹:

張軍,同程藝龍大資料開發工程師

活動推薦:

僅需99元即可體驗阿裡雲基于 Apache Flink 建構的企業級産品-實時計算 Flink 版!點選下方連結了解活動詳情:

https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
Flink內建Iceberg在同程藝龍的實踐