天天看點

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

本文由李勁松、胡争分享,社群志願者楊偉海、李培殿整理。主要介紹在資料湖的架構中,CDC 資料實時讀寫的方案和原理。文章主要分為 4 個部分内容:

  1. 常見的 CDC 分析方案
  2. 為何選擇 Flink + Iceberg
  3. 如何實時寫入讀取
  4. 未來規劃

一、常見的 CDC 分析方案

我們先看一下今天的 topic 需要設計的是什麼?輸入是一個 CDC 或者 upsert 的資料,輸出是 Database 或者是用于大資料 OLAP 分析的存儲。

我們常見的輸入主要有兩種資料,第一種資料是資料庫的 CDC 資料,不斷的産生 changeLog;另一種場景是流計算産生的 upsert 資料,在最新的 Flink 1.12 版本已經支援了 upsert 資料。

1.1 離線 HBase 叢集分析 CDC 資料

我們通常想到的第一個方案,就是把 CDC upsert 的資料通過 Flink 進行一些處理之後,實時的寫到 HBase 當中。HBase 是一個線上的、能提供線上點查能力的一種資料庫,具有非常高的實時性,對寫入操作是非常友好的,也可以支援一些小範圍的查詢,而且叢集可擴充。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

這種方案其實跟普通的點查實時鍊路是同一套,那麼用 HBase 來做大資料的 OLAP 的查詢分析有什麼問題呢?

首先,HBase 是一個面向點查設計的一種資料庫,是一種線上服務,它的行存的索引不适合分析任務。典型的數倉設計肯定是要列存的,這樣壓縮效率和查詢效率才會高。第二,HBase 的叢集維護成本比較高。最後,HBase 的資料是 HFile,不友善與大資料裡數倉當中典型的 Parquet、Avro、Orc 等結合。

1.2 Apache Kudu 維護 CDC 資料集

針對 HBase 分析能力比較弱的情況,社群前幾年出現了一個新的項目,這就是 Apache Kudu 項目。Kudu 項目擁有 HBase 的點查能力的同時,采用列存,這樣列存加速非常适合 OLAP 分析。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

這種方案會有什麼問題呢?

首先 Kudu 是比較小衆的、獨立的叢集,維護成本也比較高,跟 HDFS、S3、OSS 比較割裂。其次由于 Kudu 在設計上保留了點查能力,是以它的批量掃描性能不如 parquet,另外 Kudu 對于 delete 的支援也比較弱,最後它也不支援增量拉取。

1.3 直接導入 CDC 到 Hive 分析

第三種方案,也是大家在數倉中比較常用的方案,就是把 MySQL 的資料寫到 Hive,流程是:維護一個全量的分區,然後每天做一個增量的分區,最後把增量分區寫好之後進行一次 Merge ,寫入一個新的分區,流程上這樣是走得通的。Hive 之前的全量分區是不受增量的影響的,隻有當增量 Merge 成功之後,分區才可查,才是一個全新的資料。這種純列存的 append 的資料對于分析是非常友好的。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

增量資料和全量資料的 Merge 是有延時的,資料不是實時寫入的,典型的是一天進行一次 Merge,這就是 T+1 的資料了。是以,時效性很差,不支援實時 upsert。每次 Merge 都需要把所有資料全部重讀重寫一遍,效率比較差、比較浪費資源。

1.4 Spark + Delta 分析 CDC 資料

針對這個問題,Spark + Delta 在分析 CDC 資料的時候提供了 MERGE INTO 的文法。這并不僅僅是對 Hive 數倉的文法簡化,Spark + Delta 作為新型資料湖的架構(例如 Iceberg、Hudi),它對資料的管理不是分區,而是檔案,是以 Delta 優化 MERGE INTO 文法,僅掃描和重寫發生變化的檔案即可,是以高效很多。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

我們評估一下這個方案,他的優點是僅依賴 Spark + Delta 架構簡潔、沒有線上服務、列存,分析速度非常快。優化之後的 MERGE INTO 文法速度也夠快。

這個方案,業務上是一個 Copy On Write 的一個方案,它隻需要 copy 少量的檔案,可以讓延遲做的相對低。理論上,在更新的資料跟現有的存量沒有很大重疊的話,可以把天級别的延遲做到小時級别的延遲,性能也是可以跟得上的。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

這個方案在 Hive 倉庫處理 upsert 資料的路上已經前進了一小步了。但小時級别的延遲畢竟不如實時更有效,是以這個方案最大的缺點在 Copy On Write 的 Merge 有一定的開銷,延遲不能做的太低。

第一部分大概現有的方案就是這麼多,同時還需要再強調一下,upsert 之是以如此重要,是因為在資料湖的方案中,upsert 是實作資料庫準實時、實時入湖的一個關鍵技術點。

二、為何選擇 Flink + Iceberg

2.1 Flink 對 CDC 資料消費的支援

第一,Flink 原生支援 CDC 資料消費。在前文 Spark + Delta 的方案中,MARGE INTO 的文法,使用者需要感覺 CDC 的屬性概念,然後寫到 merge 的文法上來。但是 Flink 是原生支援 CDC 資料的。使用者隻要聲明一個 Debezium 或者其他 CDC 的 format,Flink 上面的 SQL 是不需要感覺任何 CDC 或者 upsert 的屬性的。Flink 中内置了 hidden column 來辨別它 CDC 的類型資料,是以對使用者而言比較簡潔。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

如下圖示例,在 CDC 的處理當中,Flink 在隻用聲明一個 MySQL Binlog 的 DDL 語句,後面的 select 都不用感覺 CDC 屬性。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

2.2 Flink 對 Change Log Stream 的支援

下圖介紹的是 Flink 原生支援 Change Log Stream,Flink 在接入一個 Change Log Stream 之後,拓撲是不用關心 Change Log flag 的 SQL。拓撲完全是按照自己業務邏輯來定義,并且一直到最後寫入 Iceberg,中間不用感覺 Change Log 的 flag。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

2.3 Flink + Iceberg CDC 導入方案評估

最後,Flink + Iceberg 的 CDC 導入方案的優點是什麼?

對比之前的方案,Copy On Write 跟 Merge On Read 都有适用的場景,側重點不同。Copy On Write 在更新部分檔案的場景中,當隻需要重寫其中的一部分檔案時是很高效的,産生的資料是純 append 的全量資料集,在用于資料分析的時候也是最快的,這是 Copy On Write 的優勢。

另外一個是 Merge On Read,即将資料連同 CDC flag 直接 append 到 Iceberg 當中,在 merge 的時候,把這些增量的資料按照一定的組織格式、一定高效的計算方式與全量的上一次資料進行一次 merge。這樣的好處是支援近實時的導入和實時資料讀取;這套計算方案的 Flink SQL 原生支援 CDC 的攝入,不需要額外的業務字段設計。

Iceberg 是統一的資料湖存儲,支援多樣化的計算模型,也支援各種引擎(包括 Spark、Presto、hive)來進行分析;産生的 file 都是純列存的,對于後面的分析是非常快的;Iceberg 作為資料湖基于 snapshot 的設計,支援增量讀取;Iceberg 架構足夠簡潔,沒有線上服務節點,純 table format 的,這給了上遊平台方足夠的能力來定制自己的邏輯和服務化。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

三、如何實時寫入讀取

3.1 批量更新場景和 CDC 寫入場景

首先我們來了解一下在整個資料湖裡面批量更新的兩個場景。

  • 第一批量更新的這種場景,在這個場景中我們使用一個 SQL 更新了成千上萬行的資料,比如歐洲的 GDPR 政策,當一個使用者登出掉自己的賬戶之後,背景的系統是必須将這個使用者所有相關的資料全部實體删除。
  • 第二個場景是我們需要将 date lake 中一些擁有共同特性的資料删除掉,這個場景也是屬于批量更新的一個場景,在這個場景中删除的條件可能是任意的條件,跟主鍵(Primary key)沒有任何關系,同時這個待更新的資料集是非常大,這種作業是一個長耗時低頻次的作業。

另外是 CDC 寫入的場景,對于對 Flink 來說,一般常用的有兩種場景,第一種場景是上遊的 Binlog 能夠很快速的寫到 data lake 中,然後供不同的分析引擎做分析使用; 第二種場景是使用 Flink 做一些聚合操作,輸出的流是 upsert 類型的資料流,也需要能夠實時的寫到資料湖或者是下遊系統中去做分析。如下圖示例中 CDC 寫入場景中的 SQL 語句,我們使用單條 SQL 更新一行資料,這種計算模式是一種流式增量的導入,而且屬于高頻的更新。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.2 Apache Iceberg 設計 CDC 寫入方案需要考慮的問題

接下來我們看下 iceberg 對于 CDC 寫入這種場景在方案設計時需要考慮哪些問題。

  • 第一是正确性,即需要保證語義及資料的正确性,如上遊資料 upsert 到 iceberg 中,當上遊 upsert 停止後, iceberg 中的資料需要和上遊系統中的資料保持一緻。
  • 第二是高效寫入,由于 upsert 的寫入頻率非常高,我們需要保持高吞吐、高并發的寫入。
  • 第三是快速讀取,當資料寫入後我們需要對資料進行分析,這其中涉及到兩個問題,第一個問題是需要支援細粒度的并發,當作業使用多個 task 來讀取時可以保證為各個 task 進行均衡的配置設定以此來加速資料的計算;第二個問題是我們要充分發揮列式存儲的優勢來加速讀取。
  • 第四是支援增量讀,例如一些傳統數倉中的 ETL,通過增量讀取來進行進一步資料轉換。
Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.3 Apache Iceberg Basic

在介紹具體的方案細節之前,我們先了解一下 Iceberg 在檔案系統中的布局,總體來講 Iceberg 分為兩部分資料,第一部分是資料檔案,如下圖中的 parquet 檔案,每個資料檔案對應一個校驗檔案(.crc檔案)。第二部分是表中繼資料檔案(Metadata 檔案),包含 Snapshot 檔案(snap-.avro)、Manifest 檔案(.avro)、TableMetadata 檔案(*.json)等。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

下圖展示了在 iceberg 中 snapshot、manifest 及 partition 中的檔案的對應關系。下圖中包含了三個 partition,第一個 partition 中有兩個檔案 f1、f3,第二個 partition 有兩個檔案f4、f5,第三個 partition 有一個檔案f2。對于每一次寫入都會生成一個 manifest 檔案,該檔案記錄本次寫入的檔案與 partition 的對應關系。再向上層有 snapshot 的概念,snapshot 能夠幫助快速通路到整張表的全量資料,snapshot 記錄多個 manifest,如第二個 snapshot 包含 manifest2 和 manifest3。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.4 INSERT、UPDATE、DELETE 寫入

在了解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操作的設計。

下圖示例的 SQL 中展示的表包含兩個字段即 id、data,兩個字段都是 int 類型。在一個 transaction 中我們進行了圖示中的資料流操作,首先插入了(1,2)一條記錄,接下來将這條記錄更新為(1,3),在 iceberg 中 update 操作将會拆為 delete 和 insert 兩個操作。

這麼做的原因是考慮到 iceberg 作為流批統一的存儲層,将 update 操作拆解為 delete 和 insert 操作可以保證流批場景做更新時讀取路徑的統一,如在批量删除的場景下以 Hive 為例,Hive 會将待删除的行的檔案 offset 寫入到 delta 檔案中,然後做一次 merge on read,因為這樣會比較快,在 merge 時通過 position 将原檔案和 delta 進行映射,将會很快得到所有未删除的記錄。

接下來又插入記錄(3,5),删除了記錄(1,3),插入記錄(2,5),最終查詢是我們得到記錄(3,5)(2,5)。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

上面操作看上去非常簡單,但在實作中是存在一些語義上的問題。如下圖中,在一個 transaction 中首先執行插入記錄(1,2)的操作,該操作會在 data file1 檔案中寫入 INSERT(1,2),然後執行删除記錄(1,2)操作,該操作會在 equalify delete file1 中寫入 DELETE(1,2),接着又執行插入記錄(1,2)操作,該操作會在 data file1 檔案中再寫入INSERT(1,2),然後執行查詢操作。

在正常情況下查詢結果應該傳回記錄 INSERT(1,2),但在實作中,DELETE(1,2)操作無法得知删除的是 data file1 檔案中的哪一行,是以兩行 INSERT(1,2)記錄都将被删除。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

那麼如何來解決這個問題呢,社群目前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通過指定一列或多列來進行删除操作,position-delete 是根據檔案路徑和行号來進行删除操作,通過将這兩種方法結合起來以保證删除操作的正确性。

如下圖我們在第一個 transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然後執行 commit 操作進行送出。接下來我們開啟一個新的 transaction 并執行插入一行資料(1,5),由于是新的 transaction,是以建立了一個 data file2 并寫入 INSERT(1,5)記錄,接下來執行删除記錄(1,5),實際寫入 delete 時是:

在 position delete file1 檔案寫入(file2, 0),表示删除 data file2 中第 0 行的記錄,這是為了解決同一個 transaction 内同一行資料反複插入删除的語義的問題。

在 equality delete file1 檔案中寫入 DELETE (1,5),之是以寫入這個 delete 是為了確定本次 txn 之前寫入的 (1,5) 能被正确删除。

然後執行删除(1,4)操作,由于(1,4)在目前 transaction 中未曾插入過,是以該操作會使用 equality-delete 操作,即在 equality delete file1 中寫入(1,4)記錄。在上述流程中可以看出在目前方案中存在 data file、position delete file、equality delete file 三類檔案。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

在了解了寫入流程後,如何來讀取呢。如下圖所示,對于 position delete file 中的記錄(file2, 0)隻需和目前 transaction 的 data file 進行 join 操作,對于 equality delete file 記錄(1,4)和之前的 transaction 中的 data file 進行 join 操作。最終得到記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正确性。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.5 Manifest 檔案的設計

上面介紹了 insert、update 及 delete,但在設計 task 的執行計劃時我們對 manifest 進行了一些設計,目的是通過 manifest 能夠快速到找到 data file,并按照資料大小進行分割,保證每個 task 處理的資料盡可能的均勻分布。

如下圖示例,包含四個 transaction,前兩個 transaction 是 INSERT 操作,對應 M1、M2,第三個 transaction 是 DELETE 操作,對應 M3,第四個 transaction 是 UPDATE 操作,包含兩個 manifest 檔案即 data manifest 和 delete manifest。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

對于為什麼要對 manifest 檔案拆分為 data manifest 和 delete manifest 呢,本質上是為了快速為每個 data file 找到對應的 delete file 清單。可以看下圖示例,當我們在 partition-2 做讀取時,需要将 deletefile-4 與datafile-2、datafile-3 做一個 join 操作,同樣也需要将 deletefile-5 與 datafile-2、datafile-3 做一個 join 操作。

以 datafile-3 為例,deletefile 清單包含 deletefile-4 和 deletefile-5 兩個檔案,如何快速找到對應的 deletefIle 清單呢,我們可以根據上層的 manifest 來進行查詢,當我們将 manifest 檔案拆分為 data manifest 和 delete manifest 後,可以将 M2(data manifest)與 M3、M4(delete manifest)先進行一次 join 操作,這樣便可以快速的得到 data file 所對應的 delete file 清單。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.6 檔案級别的并發

另一個問題是我們需要保證足夠高的并發讀取,在 iceberg 中這點做得非常出色。在 iceberg 中可以做到檔案級别的并發讀取,甚至檔案中更細粒度的分段的并發讀取,比如檔案有 256MB,可以分為兩個 128MB 進行并發讀取。這裡舉例說明,假設 insert 檔案跟 delete 檔案在兩個 Bucket 中的布局方式如下圖所示。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

我們通過 manifest 對比發現,datafile-2 的 delete file 清單隻有 deletefile-4,這樣可以将這兩個檔案作為一個單獨的 task(圖示中Task-2)進行執行,其他的檔案也是類似,這樣可以保證每個 task 資料較為均衡的進行 merge 操作。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

對于這個方案我們做了簡單的總結,如下圖所示。首先這個方案的優點可以滿足正确性,并且可以實作高吞吐寫入和并發高效的讀取,另外可以實作 snapshot 級别的增量的拉取。

目前該方案還是比較粗糙,下面也有一些可以優化的點。

  • 第一點,如果同一個 task 内的 delete file 有重複可以做緩存處理,這樣可以提高 join 的效率。
  • 第二點,當 delete file 比較大需要溢寫到磁盤時可以使用 kv lib 來做優化,但這不依賴外部服務或其他繁重的索引。
  • 第三點,可以設計 Bloom filter(布隆過濾器)來過濾無效的 IO,因為對于 Flink 中常用的 upsert 操作會産生一個 delete 操作和一個 insert 操作,這會導緻在 iceberg 中 data file 和 delete file 大小相差不大,這樣 join 的效率不會很高。如果采用 Bloom Filter,當 upsert 資料到來時,拆分為 insert 和 delete 操作,如果通過 bloom filter 過濾掉那些之前沒有 insert 過資料的 delete 操作(即如果這條資料之前沒有插入過,則不需要将 delete 記錄寫入到 delete file 中),這将極大的提高 upsert 的效率。
  • 第四點,是需要一些背景的 compaction 政策來控制 delete file 檔案大小,當 delete file 越少,分析的效率越高,當然這些政策并不會影響正常的讀寫。
Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

3.7 增量檔案集的 Transaction 送出

前面介紹了檔案的寫入,下圖我們介紹如何按照 iceberg 的語義進行寫入并且供使用者讀取。主要分為資料和 metastore 兩部分,首先會有 IcebergStreamWriter 進行資料的寫入,但此時寫入資料的中繼資料資訊并沒有寫入到 metastore,是以對外不可見。第二個算子是 IcebergFileCommitter,該算子會将資料檔案進行收集, 最終通過 commit transaction 來完成寫入。

在 Iceberg 中并沒有其他任何其他第三方服務的依賴,而 Hudi 在某些方面做了一些 service 的抽象,如将 metastore 抽象為獨立的 Timeline,這可能會依賴一些獨立的索引甚至是其他的外部服務來完成。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料

四、未來規劃

下面是我們未來的一些規劃,首先是 Iceberg 核心的一些優化,包括方案中涉及到的全鍊路穩定性測試及性能的優化, 并提供一些 CDC 增量拉取的相關 Table API 接口。

在 Flink 內建上,會實作 CDC 資料的自動和手動合并資料檔案的能力,并提供 Flink 增量拉取 CDC 資料的能力。

在其他生态內建上,我們會對 Spark、Presto 等引擎進行內建,并借助 Alluxio 加速資料查詢。

Flink 如何實時分析 Iceberg 資料湖的 CDC 資料