天天看點

資料湖Hudi核心概念與架構設計總結

資料湖Hudi核心概念與架構設計總結

編 輯:王知無

彭友們好,我是老彭啊。Hudi是現在非常熱門的資料湖開源方案,非常适合于搭建一個資料湖平台。

有些人認為資料湖肯定與大資料技術體系完全不一樣,是兩個東西,甚至認為他倆沒關系。

但是,你知道Hudi的全稱叫啥麼?就是“Hadoop Updates and Incrementals”

資料湖Hudi核心概念與架構設計總結
資料湖Hudi核心概念與架構設計總結
資料湖Hudi核心概念與架構設計總結

簡單來說,就是基于Hadoop生态,支援HDFS的資料删除和增量更新的技術架構。

是以,Apache Hudi其實本就是從Hadoop生态裡來的,依賴 HDFS 做底層的存儲,是以可以支撐非常大規模的資料存儲。同時基于update和Incrementals兩個原語解決流批一體的存儲問題:

  • Update/Delete 記錄:Hudi 支援更新/删除記錄,使用檔案/記錄級别索引,同時對寫操作提供事務保證。查詢可擷取最新送出的快照來産生結果。
  • 變更流:支援增量擷取表中所有更新/插入/删除的記錄,從指定時間點開始進行增量查詢,可以實作類似 Kafka 的增量消費機制。
資料湖Hudi核心概念與架構設計總結

Hudi設計原則

流式讀/寫:Hudi借鑒了資料庫設計的原理,從零設計,應用于大型資料集記錄流的輸入和輸出。為此,Hudi提供了索引實作,可以将記錄的鍵快速映射到其所在的檔案位置。同樣,對于流式輸出資料,Hudi通過其特殊列添加并跟蹤記錄級的中繼資料,進而可以提供所有發生變更的精确增量流。

自管理:Hudi注意到使用者可能對資料新鮮度(寫友好)與查詢性能(讀/查詢友好)有不同的期望,它支援了三種查詢類型,這些類型提供實時快照,增量流以及稍早的純列資料。在每一步,Hudi都努力做到自我管理(例如自動優化編寫程式的并行性,保持檔案大小)和自我修複(例如:自動復原失敗的送出),即使這樣做會稍微增加運作時成本(例如:在記憶體中緩存輸入資料已分析工作負載)。如果沒有這些内置的操作杠杆/自我管理功能,這些大型流水線的營運成本通常會翻倍。

萬物皆日志:Hudi還具有 append only、雲資料友好的設計,該設計實作了日志結構化存儲系統的原理,可以無縫管理所有雲提供商的資料。

鍵-值資料模型:在寫方面,Hudi表被模組化為鍵值對資料集,其中每條記錄都有一個唯一的記錄鍵。此外,一個記錄鍵還可以包括分區路徑,在該路徑下,可以對記錄進行分區和存儲。這通常有助于減少索引查詢的搜尋空間。

資料湖Hudi核心概念與架構設計總結

Hudi表設計

Hudi表的三個主要元件:

  1. 有序的時間軸中繼資料:類似于資料庫事務日志。
  2. 分層布局的資料檔案:實際寫入表中的資料。
  3. 索引(多種實作方式):映射包含指定記錄的資料集。

另外,針對資料的寫入和查詢,Hudi提供一些非常重要的功能例如upsert、mvvc等。

時間軸TimeLine

Timeline 是 HUDI 用來管理送出(commit)的抽象,每個 commit 都綁定一個固定時間戳,分散到時間線上。在 Timeline 上,每個 commit 被抽象為一個 HoodieInstant,一個 instant 記錄了一次送出 (commit) 的行為、時間戳、和狀态。HUDI 的讀寫 API 通過 Timeline 的接口可以友善的在 commits 上進行條件篩選,對 history 和 on-going 的 commits 應用各種政策,快速篩選出需要操作的目标 commit。

如圖所示:

資料湖Hudi核心概念與架構設計總結

Hudi維護了一條包含在不同的即時時間(instant time)對資料集做的所有instant操作的timeline,進而提供表的即時視圖,同時還有效支援按到達順序進行資料檢索。時間軸類似于資料庫的redo/transaction日志,由一組時間軸執行個體組成。Hudi保證在時間軸上執行的操作的原子性和基于即時時間的時間軸一緻性。時間軸被實作為表基礎路徑下.hoodie中繼資料檔案夾下的一組檔案。具體來說,最新的instant被儲存為單個檔案,而較舊的instant被存檔到時間軸歸檔檔案夾中,以限制writers和queries列出的檔案數量。

一個Hudi 時間軸instant由下面幾個元件構成:

  1. 操作類型:對資料集執行的操作類型;
  2. 即時時間:即時時間通常是一個時間戳(例如:20190117010349),該時間戳按操作開始時間的順序單調增加;
  3. 即時狀态:instant的目前狀态;每個instant都有avro或者json格式的中繼資料資訊,詳細的描述了該操作的狀态以及這個即時時刻instant的狀态。

關鍵的Instant操作類型有:

  1. COMMIT:一次送出表示将一組記錄原子寫入到資料集中;
  2. CLEAN: 删除資料集中不再需要的舊檔案版本的背景活動;
  3. DELTA_COMMIT:将一批記錄原子寫入到MergeOnRead存儲類型的資料集中,其中一些/所有資料都可以隻寫到增量日志中;
  4. COMPACTION: 協調Hudi中差異資料結構的背景活動,例如:将更新從基于行的日志檔案變成列格式。在内部,壓縮表現為時間軸上的特殊送出;
  5. ROLLBACK: 表示送出/增量送出不成功且已復原,删除在寫入過程中産生的所有部分檔案;
  6. SAVEPOINT: 将某些檔案組标記為"已儲存",以便清理程式不會将其删除。在發生災難/資料恢複的情況下,它有助于将資料集還原到時間軸上的某個點;

任何給定的即時都會處于以下狀态之一:

  • REQUESTED:表示已排程但尚未初始化;
  • INFLIGHT: 表示目前正在執行該操作;
  • COMPLETED: 表示在時間軸上完成了該操作.

資料檔案

資料湖Hudi核心概念與架構設計總結

Hudi将表組織成DFS上基本路徑下的檔案夾結構中。如果表是分區的,則在基本路徑下還會有其他的分區,這些分區是包含該分區資料的檔案夾,與Hive表非常類似。每個分區均由相對于基本路徑的分區路徑唯一辨別。在每個分區内,檔案被組織成檔案組,由檔案ID唯一辨別。其中每個切片包含在某個送出/壓縮即時時間生成的基本列檔案(.parquet)以及一組日志檔案(.log*),該檔案包含自生成基本檔案以來對基本檔案的插入/更新。Hudi采用了MVCC設計,壓縮操作會将日志和基本檔案合并以産生新的檔案片,而清理操作則将未使用的/較舊的檔案片删除以回收HDFS上的空間。

下圖展示了一個分區内的檔案結構:

資料湖Hudi核心概念與架構設計總結

檔案版本

一個新的 base commit time 對應一個新的 FileSlice,實際就是一個新的資料版本。HUDI 通過 TableFileSystemView 抽象來管理 table 對應的檔案,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 讀)或者 base + log files(Merge On Read 讀)。通過 Timeline 和 TableFileSystemView 抽象,HUDI 實作了非常便捷和高效的表檔案查找。

檔案格式

Hoodie 的每個 FileSlice 中包含一個 base file (merge on read 模式可能沒有)和多個 log file (copy on write 模式沒有)。

每個檔案的檔案名都帶有其歸屬的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通過檔案名的 group id 組織 FileGroup 的 logical 關系;通過檔案名的 base commit time 組織 FileSlice 的邏輯關系。

HUDI 的 base file (parquet 檔案) 在 footer 的 meta 去記錄了 record key 組成的 BloomFilter,用于在 file based index 的實作中實作高效率的 key contains 檢測。隻有不在 BloomFilter 的 key 才需要掃描整個檔案消滅假陽。

HUDI 的 log (avro 檔案)是自己編碼的,通過積攢資料 buffer 以 LogBlock 為機關寫出,每個 LogBlock 包含 magic number、size、content、footer 等資訊,用于資料讀、校驗和過濾。

索引設計

Hudi通過索引機制提供高效的upsert操作,該機制會将一個記錄鍵+分區路徑組合一緻性的映射到一個檔案ID.這個記錄鍵和檔案組/檔案ID之間的映射自記錄被寫入檔案組開始就不會再改變。簡而言之,這個映射檔案組包含了一組檔案的所有版本。Hudi目前提供了3種索引實作(HBaseIndex、HoodieBloomIndex(HoodieGlobalBloomIndex)、InMemoryHashIndex)來映射一個記錄鍵到包含該記錄的檔案ID。這将使我們無需掃描表中的每條記錄,就可顯著提高upsert速度。

Hudi索引可以根據其查詢分區記錄的能力進行分類:

1. 全局索引:不需要分區資訊即可查詢記錄鍵映射的檔案ID。比如,寫程式可以傳入null或者任何字元串作為分區路徑(partitionPath),但索引仍然會查找到該記錄的位置。全局索引在記錄鍵在整張表中保證唯一的情況下非常有用,但是查詢的消耗随着表的大小呈函數式增加。

2. 非全局索引:與全局索引不同,非全局索引依賴分區路徑(partitionPath),對于給定的記錄鍵,它隻會在給定分區路徑下查找該記錄。這比較适合總是同時生成分區路徑和記錄鍵的場景,同時還能享受到更好的擴充性,因為查詢索引的消耗隻與寫入到該分區下資料集大小有關系。

表類型

Copy On Write

COW表寫的時候資料直接寫入basefile,(parquet)不寫log檔案。是以COW表的檔案片隻包含basefile(一個parquet檔案構成一個檔案片)。這種的存儲方式的Spark DAG相對簡單。關鍵目标是是使用partitioner将tagged Hudi記錄RDD(所謂的tagged是指已經通過索引查詢,标記每條輸入記錄在表中的位置)分成一些列的updates和inserts.為了維護檔案大小,我們先對輸入進行采樣,獲得一個工作負載profile,這個profile記錄了輸入記錄的insert和update、以及在分區中的分布等資訊。把資料從新打包,這樣:

  • 對于updates,該檔案ID的最新版本都将被重寫一次,并對所有已更改的記錄使用新值。
  • 對于inserts,記錄首先打包到每個分區路徑中的最小檔案中,直到達到配置的最大大小。之後的所有剩餘記錄将再次打包到新的檔案組,新的檔案組也會滿足最大檔案大小要求。
資料湖Hudi核心概念與架構設計總結

Copy On Write 類型表每次寫入都會生成一個新的持有base file(對應寫入的 instant time)的 FileSlice。

使用者在snapshot讀取的時候會掃描所有最新的FileSlice下的base file。

Merge On Read

MOR表寫資料時,記錄首先會被快速的寫進日志檔案,稍後會使用時間軸上的壓縮操作将其與基礎檔案合并。根據查詢是讀取日志中的合并快照流還是變更流,還是僅讀取未合并的基礎檔案,MOR表支援多種查詢類型。在高層次上,MOR writer在讀取資料時會經曆與COW writer 相同的階段。這些更新将追加到最新檔案篇的最新日志檔案中,而不會合并。對于insert,Hudi支援兩種模式:

  1. 插入到日志檔案:有可索引日志檔案的表會執行此操作(HBase索引);
  2. 插入parquet檔案:沒有索引檔案的表(例如布隆索引)

與寫時複制(COW)一樣,對已标記位置的輸入記錄進行分區,将所有發往相同檔案id的upsert分到一組。這批upsert會作為一個或多個日志塊寫入日志檔案。Hudi允許用戶端控制日志檔案大小。對于寫時複制(COW)和讀時合并(MOR)writer來說,Hudi的WriteClient是相同的。幾輪資料的寫入将會累積一個或多個日志檔案。這些日志檔案與基本的parquet檔案(如果有)一起構成一個檔案片,而這個檔案片代表該檔案的一個完整版本。

這種表是用途最廣、最進階的表。為寫(可以指定不同的壓縮政策,吸收突發寫流量)和查詢(例如權衡資料的時效性和查詢性能)提供了很大的靈活性。

Merge On Read 表的寫入行為,依據 index 的不同會有細微的差别:

  • 對于 BloomFilter 這種無法對 log file 生成 index 的索引方案,對于 INSERT 消息仍然會寫 base file (parquet format),隻有 UPDATE 消息會 append log 檔案(因為 base file 已經記錄了該 UPDATE 消息的 FileGroup ID)。
  • 對于可以對 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次寫入都是 log format,并且會不斷追加和 roll over。

Merge On Read 表的讀在 READ OPTIMIZED 模式下,隻會讀最近的經過 compaction 的 commit。

資料湖Hudi核心概念與架構設計總結
資料湖Hudi核心概念與架構設計總結

資料讀寫流程

讀流程

Snapshot讀

讀取所有 partiiton 下每個 FileGroup 最新的 FileSlice 中的檔案,Copy On Write 表讀 parquet 檔案,Merge On Read 表讀 parquet + log 檔案

Incremantal讀

根據https://hudi.apache.org/docs/querying_data.html#spark-incr-query描述,目前的 Spark data source 可以指定消費的起始和結束 commit 時間,讀取 commit 增量的資料集。但是内部的實作不夠高效:拉取每個 commit 的全部目标檔案再按照系統字段 hoodie_commit_time apply 過濾條件。

Streaming 讀

HUDI Flink writer 支援實時的增量訂閱,可用于同步 CDC 資料,日常的資料同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監控新增的改動檔案,将讀取任務下派給讀 task。

寫流程

寫操作

  • UPSERT:預設行為,資料先通過 index 打标(INSERT/UPDATE),有一些啟發式算法決定消息的組織以優化檔案的大小 => CDC 導入
  • INSERT:跳過 index,寫入效率更高 => Log Deduplication
  • BULK_INSERT:寫排序,對大資料量的 Hudi 表初始化友好,對檔案大小的限制 best effort(寫 HFile)

寫流程(UPSERT)

Copy On Write

  • 先對 records 按照 record key 去重
  • 首先對這批資料建立索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
  • 對于 update 消息,會直接找到對應 key 所在的最新 FileSlice 的 base 檔案,并做 merge 後寫新的 base file (新的 FileSlice)
  • 對于 insert 消息,會掃描目前 partition 的所有 SmallFile(小于一定大小的 base file),然後 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice

Merge On Read

  • 先對 records 按照 record key 去重(可選)
  • 首先對這批資料建立索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
  • 如果是 insert 消息,如果 log file 不可建索引(預設),會嘗試 merge 分區内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個 FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個 FileGroup + FileSlice + base file
  • 如果是 update 消息,寫對應的 file group + file slice,直接 append 最新的 log file(如果碰巧是目前最小的小檔案,會 merge base file,生成新的 file slice)log file 大小達到門檻值會 roll over 一個新的

寫流程(INSERT)

Copy On Write

  • 先對 records 按照 record key 去重(可選)
  • 不會建立 Index
  • 如果有小的 base file 檔案,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file

Merge On Read

  • 先對 records 按照 record key 去重(可選)
  • 不會建立 Index
  • 如果 log file 可索引,并且有小的 FileSlice,嘗試追加或寫最新的 log file;如果 log file 不可索引,寫一個新的 FileSlice + base file。​
資料湖Hudi核心概念與架構設計總結

總結

主要是我個人收集和翻閱Hudi社群的一些資料過程中的總結。目前Hudi版本到了0.11版本。細節上可能有所差異,以社群為準。

資料湖Hudi核心概念與架構設計總結