天天看點

資料湖系列(1) - Hudi 核心功能原理剖析

随着網際網路業務的逐漸成熟,數倉和模型訓練的基本盤逐漸穩固,越來越多的工程師從業務開發需求轉移到了工程的架構更新,而常用的 Hudi 和 Iceberg 往往會成為替代 Hive/Hdfs 等架構更新的選型。

概要

網上關于 Hudi 和 Iceberg 對比的内容有很多,比如 Iceberg 對 Schema 友好,Hudi 支援 Upsert 等優劣點的對比,這些内容很大程度上已經過時,在未來的幾個月内,我們就能看到大部分關鍵功能在兩個架構上的打平,是以非常有必要相對全面地了解兩個架構的背景、設計思想、功能細節等。

本文針對 Hudi 的機制做了相對較全面的梳理,由于具體内容不涉及到源碼的具體細節,很多并不熟悉資料湖的同學也可以在這裡了解到其技術上的全貌和亮點。

Apache Hudi

先說背景,Hudi(Hadoop Upsert anD Incremental),從 Uber 内部孵化出來的開源項目,最初是用于解決數倉中 Lambda 架構中資料一緻性的問題,将增量處理模型替代流式處理模型,并提供了 Upsert 和 Incremental Pull 兩個非常重要的 feature。

這裡可以提一下,Hudi 内部主打的一個場景,就是乘客打車下單和司機接單的比對,乘客和司機分别是兩條資料流,通過 Hudi 的 Upsert 能力和增量讀取功能,可以分鐘級地講這兩條資料流進行拼接,得到乘客-司機的比對資料。

(還記得兩年前 Hudi 對自身的定位是一個 Pipeline 或者存儲架構,現在官網的描述已經變成了「下一代的流式資料湖平台」,繼商業化之後,想象空間也上升了好幾個 Level。)

基本概念

Timeline

Timeline 可以了解為 Hudi 表的一個時間線,記錄了 Hudi 表在不同時刻的資訊和行為,這個 Timeline 由 TimelineServer 來管理,通常存在于 Hdfs、RDBMS 等持久化存儲媒體中。實際上,Hudi 将 Timeline 資訊放到每個 Table 内的 .hoodie 目錄中,并通過檔案名來進行不同 instant 的區分。通過 Timeline 可以友善地做版本管理以及實作增量處理等和版本/時間相關的功能。

Timeline 涉及到 3 個關鍵概念:

•action: 目前時刻的動作(類似 commit、rollback 等)•time: 目前時刻的時間點,毫秒級别•state: 目前動作的狀态

可以看到,所有需要更改表元資訊的操作,都是需要将對應的 action 送出至 Timeline,而 Timeline 的操作要保證原子性,一般要由單點進行操作,比如 Hudi 在與 Spark/Flink 結合時,利用 Spark 的 Driver 和 Flink 的 JobMaster 來進行 Timeline 的資訊記錄。

Table Types & Query Types

Hudi 提供了兩種表類型,分别為 Copy-on-Write 和 Merge-on-Read,其對應的查詢類型如下:

對于 Copy-On-Write Table,使用者的 update 會重寫資料所在的檔案,是以是一個寫放大很高,但是讀放大為 0,适合寫少讀多的場景。對于這種 Table,提供了兩種查詢:

•Snapshot Query: 查詢最近一次 snapshot 的資料,也就是最新的資料。•Incrementabl Query:使用者需要指定一個 commit time,然後 Hudi 會掃描檔案中的記錄,過濾出 commit_time > 使用者指定的 commit time 的記錄。

具體的流程見下圖 gif:

資料湖系列(1) - Hudi 核心功能原理剖析

Copy On Write Table

對于 Merge-On-Read Table,整體的結構有點像 LSM-Tree,使用者的寫入先寫入到 delta data 中,這部分資料使用行存,這部分 delta data 可以手動 merge 到存量檔案中,整理為 parquet 的列存結構。對于這類 Table,提供了三種查詢:

•Snapshot Query: 查詢最近一次 snapshot 的資料,也就是最新的資料。這裡是一個行列資料混合的查詢。•Incrementabl Query:使用者需要指定一個 commit time,然後 Hudi 會掃描檔案中的記錄,過濾出 commit_time > 使用者指定的 commit time 的記錄。這裡是一個行列資料混合的查詢。•Read Optimized Query: 隻查存量資料,不查增量資料,因為使用的都是列式檔案格式,是以效率較高。

具體的流程見下圖 gif:

資料湖系列(1) - Hudi 核心功能原理剖析

Merge On Read Table

MOR 表中可能存在兩種檔案,在 Hudi 内部被稱為 base file 和 log file,其中 base file 通常為 parquet 檔案,列存格式,對讀取友好,log file 通常為 avro 格式,行存,對寫入友好。

Index

Hudi 表有主鍵的概念,是以 Index 的出現也非常合理,可以用于定位資料的位置以提供更高效的寫入和讀取操作,不同的 Index 類型提供了不同的粒度:

•Bloom Index•Simple Index•HBase Index•Hash Index

對于每條 Record,我們會查詢/計算 Record 的主鍵所在索引的方式,來判斷是 Insert 還是 Update,以及對應的舊檔案的位置。在實時寫入的過程中,Index 的查詢是最關鍵的部分之一,索引設計的高效與否直接決定了資料寫入的性能和穩定性。(可以之後專門出一篇文章來寫這個内容)

File Layouts

示意圖如下所示,由外到裡分别是:

•Table•Partition•FileGroup(由 FileGroupId 或者 FileID 作為辨別符):Partition 由多個 FileGroup 組成,每個 FileGroup 包括一個 base file 和多個 log file•base file 和 log file:詳見上面對 MOR 表的闡述

資料湖系列(1) - Hudi 核心功能原理剖析

hudi-file-layouts

核心功能機制

Upsert

上面提到,Upsert 的操作和 Index 類型很相關,但是在 Hudi 内部有趣的是,由于初始架構設計的缺陷(并未考慮非 Spark 場景),導緻了不同 Connector 在使用 Index 上有非常大的差異。

Hudi 目前支援 Spark 和 Flink,而我們這裡也以這兩種計算引擎為例,講解一下 Upsert 具體的實作機制:

•Spark:1.對重複 PK 的資料進行 dedup,如果 Payload 實作了 preCombine 方法,對于相同 PK 的資料會調用 preCombine 進行 Payload 的合并;如果沒有實作,則使用 Hudi 周遊時相同 PK 的第一條資料2.調用 Index 的 tagLocation 方法,根據 PK 查詢到表中已存在資料的位置,并記錄 fileId 和 commitTs(用于定位具體的位置),對于沒有找到的資料,暫時将 location 置為空3.根據 Partition 進行計數,計算每個 Partition 需要寫入的資料條數,并生成對應的 WorkloadStat(即每個 Partition 對應的 insert 和 update 數量),并根據已有的檔案分布(比如優先把新資料寫入資料量小的 FileGroup)決定每個 FileGroup 寫入的資料條數4.給每條資料配置設定 FileGroup 位置,對于 UPDATE 資料,直接使用 Index 中擷取的位置;對于 INSERT 資料,基于上一步驟的結果進行随機配置設定;将資料按照不同的 FileGroup 位置進行 partitionBy 操作,使得 Spark 的每個 Partition 都隻處理一個 FileGroup 的資料5.将 Spark partition 的資料進行寫入,寫入成功後傳回每條資料的 location,并逐條更新 Index,更新成功後将此次資料寫入 commit 到 Hudi 的 Timeline•Flink:1.作業開始一個新的 Checkpoint 後,相對應的會開啟一個 Hudi 的新 instant,兩次成功的 checkpoint 之間就是一次 Hudi 的 instant 資料寫入事件2.将資料按 PK 進行 keyBy 操作,保證相同 PK 資料都在同樣的 Task 中被處理3.利用 Flink State 來存儲每個 PK 對應的位置,對于 UPDATE 資料,直接通過通路 State 來擷取對應的 FileGroup 位置;對于 INSERT 資料,和 Spark 類似,通過已有的檔案分布資訊來決定插入的位置4.根據 FileGroup Id 進行 keyBy 操作,保證相同位置的資料都在同樣的 Task 中被處理5.寫入資料的 Task 以 batch size 的形式緩存和寫入資料,并在 Checkpoint 時将寫入成功的資料元資訊發送給 JobMaster 進行 commit

Bulk Insert

Bulk Insert 的操作比較簡單,隻用于某個 Partition 或者某個 Table 初次初始化時使用,由于沒有 Update 操作,是以隻需要考慮 Insert 情況,性能相比 Upsert 有非常大的提升。

增量讀取

這裡可以看一下 Hudi 官方的 Incremental Query 的示例[1]

// spark-shell
// reload data
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")


val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in


// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")


spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
           

每個 Hudi 表中都有一個隐藏列叫 _hoodie_commit_time,類型是時間戳,表示資料 commit 的時間,在增量讀取時,我們通過指定 commit 的時間戳進行增量的範圍定義。在 Hudi 内部,通過使用者指定的 beginTime 時間戳,對 Timeline 上的 Instant 進行篩選,得到 beginTime 後續的 Instant 範圍,并找到對應的資料寫入的 Metadata,來得到需要進行掃描的檔案。如果遇到 Compaction 的情況,則在掃描時會對 Compaction 後的資料明細進行時間範圍的過濾。

其他

•事務支援/并發控制:最開始 Hudi 隻支援 Table 級别的隔離,比如同時對同一個 Table 産生 commits,那麼隻有一個 commit 會成功,另一個 commit 會送出失敗;後續做了一定的改進,将 Table 級别的隔離做到的 FileGroup 級别。這個在資料庫系統中也是非常常見的做法。•preCombine 能力:HoodieRecordPayload 的實作如下,每條記錄會被封裝成一個 Payload,使用者可以在 Payload 中實作相同 PK 的資料 merge 操作,預設是取最新的一條記錄。

public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable {
  T preCombine(T oldValue);
}
           

總結

本文闡述了 Hudi 相關的基本概念,如 Timeline、COW/MOR 表的定義,以及檔案分布等内容,并針對核心功能如 Upsert、增量讀取進行了原了解釋,可以看到 Hudi 是以一個具有豐富功能的 Format 的形式存在,使用 merge-on-read 思想來實作 Upsert 的傳輸語義。

下一篇會更新和 Iceberg 技術棧相關的文章,敬請期待。

References

[1]

 示例: https://hudi.apache.org/docs/quick-start-guide#incremental-query