天天看點

Apache Hudi 資料湖概述

文章目錄

  • ​​前言​​
  • ​​hudi是什麼​​
  • ​​hudi 實作更新的基本原理​​
  • ​​基礎檔案​​
  • ​​增量日志檔案​​
  • ​​檔案組​​
  • ​​檔案的版本​​
  • ​​COW表資料的更新​​
  • ​​MOR表資料的更新​​
  • ​​MOR 表的compact​​
  • ​​hudi 不同表格式的讀取方式​​
  • ​​COW表資料的讀取​​
  • ​​MOR表資料的讀取​​
  • ​​不同表格式的特性​​
  • ​​Hudi的應用​​
  • ​​mysql cdc​​
  • ​​分庫資料查詢​​
  • ​​異常資料準實時分析​​
  • ​​準實時圖表繪制​​

前言

介紹本文之前,先給大家說一些簡單的大資料概念。在大資料體系中,我們常用的存儲是 ​

​HDFS(Hadoop Distributed File System)​

​​,常用的計算是 ​

​map-reduce​

​​、​

​spark​

​​、​

​flink​

​​。對于 ​

​HDFS(Hadoop Distributed File System)​

​​而言,它是一個分布式的檔案系統,資料平台的所有資料都存儲在裡面,最佳的使用實踐是:一次寫入,多次讀取。是以大資料的計算基本上都是批處理計算(對有界的、确定資料範圍的資料進行計算,不具備實時性),也就是 ​

​T-1​

​​ 的計算,​

​T+1​

​​ 才能知道資料的計算結果(比如在前些年使用者的銀行轉賬,隔天才能收到)。但在網際網路高速發展的今天,​

​T+1​

​ 的資料結果已經無法滿足使用者的需求,大資料也需要做出一些變革。

歸根結底,離線數倉無法支援實時業務的一個原因是不支援更新,或者說不支援高效的更新。舊的更新方式通常使用 overwrite 的方式來重寫舊的資料,但耗時長、代價大。 大資料也有一些支援更新的元件如:​

​hbase​

​​, ​

​kudu​

​​。但這些方式要麼是更新的代價較大,要麼就是無法與大資料的計算引擎充分結合。但是近兩年湧出的資料湖産品可以完美的解決大資料的更新問題,對于資料湖而言,更新并不是最主要的,它還支援資料庫具備的​

​ACID​

​​等特性,資料的曆史鏡像查詢、資料的增量讀取、​

​savepoint​

​​ 等等。除此之外它更打通了和其它大資料計算引擎的結合,使得我們的資料有了統一的存儲地方:資料湖​​1​​。

hudi是什麼

​Apache Hudi​

​ 是一個流式資料湖平台,支援對海量資料快速更新。内置表格式,支援事務的存儲層、 一系清單服務、資料服務(開箱即用的攝取工具)以及完善的運維監控工具

Apache Hudi 資料湖概述

上圖從下到上,由左向右看

  • ​hudi​

    ​​ 底層的資料可以存儲到​

    ​hdfs​

    ​​、​

    ​s3​

    ​​、​

    ​azure​

    ​​、​

    ​alluxio​

    ​ 等存儲
  • ​hudi​

    ​​ 可以使用​

    ​spark/flink​

    ​​ 計算引擎來消費​

    ​kafka、pulsar​

    ​​ 等消息隊列的資料,而這些資料可能來源于​

    ​app​

    ​​ 或者微服務的業務資料、日志資料,也可以是​

    ​mysql​

    ​​ 等資料庫的​

    ​binlog​

    ​ 日志資料
  • ​spark/hudi​

    ​​ 首先将這些資料處理為​

    ​hudi​

    ​​ 格式的​

    ​row tables​

    ​​ (原始表),然後這張原始表可以被​

    ​Incremental ETL​

    ​​ (增量處理)生成一張​

    ​hudi​

    ​​ 格式的​

    ​derived tables​

    ​ 派生表
  • ​hudi​

    ​​ 支援的查詢引擎有:​

    ​trino​

    ​​、​

    ​hive​

    ​​、​

    ​impala​

    ​​、​

    ​spark​

    ​​、​

    ​presto​

    ​ 等
  • 支援​

    ​spark、flink、map-reduce​

    ​​ 等計算引擎繼續對​

    ​hudi​

    ​ 的資料進行再次加工處理

上圖資訊總結一下:

  1. 支援大多數存儲引擎
  2. 支援消費消息隊列資料
  3. 支援多引擎查詢分析
  4. 支援大資料的計算引擎
  5. 支援增量處理

hudi 實作更新的基本原理

在介紹之前,友善大家了解,大家可以先記住一個基本原理,​

​hudi​

​​ 資料的更新都是資料檔案的追加。另外 ​

​hudi​

​​ 根據不同資料更新方式劃分了兩種表格式,分别為 ​

​MOR(MERGE ON READ)​

​​ 讀時合并和 ​

​COW(COPY ON WRITE)​

​ 寫時合并。通過名字大家可以簡單了解下兩種表格式,下面會介紹具體更新細節。

基礎檔案

Apache Hudi 資料湖概述

​hudi​

​​ 底層,将資料存儲到基礎檔案中,該檔案以 ​

​parquet​

​​、​

​orc​

​​ 等列存格式存放。該格式在大資料存儲中被廣泛使用,列裁剪​​2​​​ 謂詞下推​​3​​等特性,對于資料的讀取非常高效。

增量日志檔案

Apache Hudi 資料湖概述

​hudi​

​​ 支援兩種表格式,​

​COW​

​​ 和 ​

​MOR​

​​,在 ​

​MOR​

​​ 格式中,更新的資料将被寫入到增量日志檔案(​

​delta log​

​​)中。每次資料的更新,都會在對應的基礎檔案上追加一個增量日志檔案。在資料被查詢​​4​​​時,​

​hudi​

​ 将會實時合并基礎檔案和增量日志檔案。

檔案組

Apache Hudi 資料湖概述

通常我們的資料在一個分區内會包含很多的基礎檔案和增量日志檔案,每一個基礎檔案和在它之上的增量檔案組成一個檔案組。在 ​

​COW​

​​ 格式中,隻有基礎檔案,那麼組成 ​

​fileGroup​

​ 檔案就是:同一個檔案的多次更新的基礎檔案的集合。

檔案的版本

Apache Hudi 資料湖概述

由于 ​

​hudi​

​​ 資料的更新都是檔案的更新,那麼在更新後,會生成資料新的版本。上圖展示的是在 ​

​COW​

​​ 表格式下,資料的更新生成一個 ​

​V2​

​​ 的基礎檔案,​

​V2​

​​ 的基礎檔案資料是增量資料和 ​

​V1​

​ 資料進行合并的結果。

COW表資料的更新

​COW​

​​ 格式下,​

​hudi​

​​ 在對一批資料進行更新時會通過索引查找資料所在的資料檔案在哪裡,如果找到就重寫該檔案,如果找不到将會新增檔案​​5​​​

Apache Hudi 資料湖概述
  • 初始狀态: 共有兩個​

    ​fileGroup​

    ​​,​

    ​fileGroup1​

    ​​ 和​

    ​fileGroup3​

    ​​ ,基礎資料檔案隻有​

    ​base file1 v1​

    ​​ 和​

    ​base file3 v1​

  • 更新後: 從更新後的狀态我們可以看出,有對​

    ​fileGroup1​

    ​​ 進行了更新操作,基礎檔案的版本變成了​

    ​v2​

    ​​。​

    ​fileGroup2​

    ​​ 中新增了一個​

    ​base file2 v1​

    ​​基礎檔案。​

    ​fileGroup3​

    ​ 沒有進行資料的更新,也就沒有檔案的變化

​COW​

​​ 表在對資料進行更新時,需要重寫整個基礎檔案,即使我們隻更新了該檔案的一條資料,該表格式有寫放大的缺點,但該表格式較為簡單,少了 ​

​MOR​

​ 表格式的合并操作,對于頻繁讀的表比較推薦該格式

MOR表資料的更新

Apache Hudi 資料湖概述

​MOR​

​​ 表和​

​COW​

​​ 表類似,在更新之前首先會使用索引來查找更新的資料檔案位置,找到之後要更新的資料會在基礎檔案的基礎之上新增一個 ​

​delta log​

​ 增量檔案。如果沒有找到,将會新增一個基礎檔案。

​MOR​

​​ 表的資料在讀取時,會實時合并基礎檔案和增量檔案傳回給使用者,如果增量檔案較多,會導緻查詢資料結果較慢。

相比較 ​​

​COW​

​​ 表,​

​MOR​

​​ 的讀取有非常明顯的讀放大,為了減少增量檔案的數量,​

​MOR​

​​ 表需要定期的進行 ​

​compact​

​ 壓縮操作。

MOR 表的compact

Apache Hudi 資料湖概述

​MOR​

​​ 表 ​

​compact​

​​ 是一個單獨的操作,​

​compact​

​​ 時,​

​hudi​

​​ 首先讀取所有的增量資料,并将其存到 ​

​map​

​​ 中, 然後使用的生産者消費者模式進行資料的合并。生産者将會讀取基礎檔案,然後将資料放入到記憶體隊列,消費者消費隊列的資料并和 ​

​map​

​​ 中的增量資料進行對比來選擇最新的資料。最後将所有資料寫入到新的 ​

​v2​

​ 檔案中。

hudi 不同表格式的讀取方式

COW表資料的讀取

Apache Hudi 資料湖概述

首先看下面的 ​

​timeline​

​​,在 ​

​10:00​

​​,​

​10:05​

​​,​

​10:10​

​​ 三個時間時,​

​hudi​

​​ 送出了三次 ​

​commit​

​。

  • ​10:00​

    ​​ 時,新增了兩個基礎檔案​

    ​fileid1​

    ​​ 和​

    ​fileid2​

  • ​10:05​

    ​​ 時,​

    ​fileid1​

    ​​,​

    ​fileid2​

    ​​,​

    ​fileid3​

    ​ 各有一份基礎資料
  • ​10:10​

    ​​ 時,進行了一次送出,對于基礎檔案​

    ​fileid1​

    ​​、​

    ​fileid2​

    ​​ 各更新了一個版本,​

    ​fileid3​

    ​​ 沒有進行資料的更新,新增了一個​

    ​fileid4​

對該 ​

​hudi​

​ 表進行查詢時,分析下選擇的基礎檔案有哪些。

  • 如果我們執行的查詢是​

    ​select count(*) QUERY STARTING AFTER COMMIT 10:10​

    ​​,也就是說我們要查詢送出時間在​

    ​10:10​

    ​​ 之後的資料量,那麼此時我們會讀取到​

    ​10:10​

    ​​ 分對應的基礎檔案鏡像。也就是​

    ​fileid1 10:10​

    ​​ 的基礎檔案、​

    ​fileid2 10:10​

    ​​ 的基礎檔案、​

    ​fileid3 10:05​

    ​​ 的基礎檔案、​

    ​fileid4 10:10​

    ​ 的基礎檔案。
  • 如果我們執行的查詢是​

    ​select count(*) QUERY STARTING BEFORE COMMIT 10:10​

    ​​,也就是查詢​

    ​10:05​

    ​​ 時對應的基礎檔案鏡像,此時掃描出來的基礎檔案有:​

    ​fileid1 10:05​

    ​​ 的基礎檔案、​

    ​fileid2 10:05​

    ​​ 的基礎檔案、​

    ​fileid3 10:05​

    ​ 的基礎檔案。
通過上面,大家應該可以看出來,​

​hudi​

​​ 實際上是通過 ​

​MVCC​

​ 來實作事務的隔離、不同時期鏡像的讀取。

MOR表資料的讀取

在對 ​

​MOR​

​​ 表資料讀取時和 ​

​COW​

​​ 表類似,隻不過 ​

​MOR​

​​ 表多了 ​

​RO​

​​ 表和 ​

​RT​

​ 表。

​RO​

​​ 表又叫讀優化表,在讀取資料時隻讀取基礎檔案,此時會獲得和 ​

​COW​

​表相同的讀取性能,但是缺點也很明顯,無法讀取到最新的增量資料。

​RT​

​ 表又叫實時表,即讀取資料時實時合并基礎檔案和增量日志檔案,讀取的資料較全,但是會耗費讀取端較大的資源。

Apache Hudi 資料湖概述

仍然先看圖最下方的 ​

​timeline​

  • ​10:01​

    ​​、​

    ​10:02​

    ​​、​

    ​10:03​

    ​​、​

    ​10:04​

    ​ 共四次的增量日志更新操作
  • ​10:05​

    ​​ 進行了一次​

    ​compact​

    ​ 操作
  • ​10:06​

    ​​、​

    ​10:07​

    ​​、​

    ​10:08​

    ​​、​

    ​10:09​

    ​​、​

    ​10:10​

    ​ 四次增量日志更新操作

然後我們逐個分析

  • ​10:05​

    ​​ 時,進行了​

    ​compact​

    ​​ 操作,将會合并之前的所有增量日志檔案,合并後資料的基礎檔案有​

    ​fileid1​

    ​​、​

    ​fileid2​

    ​​、​

    ​fileid3​

  • ​10:06​

    ​​ 時對​

    ​fileid1​

    ​​、​

    ​fileid4​

    ​​ 進行了更新,各新增了一個​

    ​10:06​

    ​ 的增量日志檔案
  • ​10:07​

    ​​ 時對​

    ​fileid2​

    ​​ 進行了更新,新增一個​

    ​10:07​

    ​ 的增量日志檔案
  • ​10:08​

    ​​ 時對​

    ​fileid1​

    ​​ 進行了更新,新增一個​

    ​10:08​

    ​ 的增量日志檔案
  • ​10:09​

    ​​ 時對​

    ​fileid1​

    ​​、​

    ​fileid2​

    ​​ 進行了更新,各新增一個​

    ​10:09​

    ​ 的增量日志檔案
  • ​10:10​

    ​​ 時對​

    ​fileid2​

    ​​、​

    ​fileid4​

    ​​ 進行了更新,新增一個​

    ​10:10​

    ​ 的增量日志檔案

然後在對資料讀取時,如果我們執行的查詢是​

​QUERY ON READ OPTIMIZED TABLE AFTER COMMIT 10:10​

​​即讀取 ​

​10:10​

​​ 時的 ​

​RT​

​​ 表,上面說了 ​

​RT​

​​ 表隻讀區基礎檔案,此時掃描到的檔案有: ​

​fileid1 10:05​

​​ 的基礎檔案、​

​fileid2 10:05​

​​ 的基礎檔案、​

​fileid3 10:05​

​​ 的基礎檔案。

如果我們執行的查詢是​​

​QUERY ON NEAR REAL-TIME TABLE AFTER COMMIT 10:10​

​​ 即讀取 ​

​10:10​

​​ 時的 ​

​RT​

​​ 表,該表會合并基礎檔案和增量日志檔案,是以掃描到的資料檔案有:( ​

​fileid1 10:05​

​​ 的基礎檔案,​

​10:06、10:08、10:09​

​​ 的增量日志檔案)、(​

​fileid2 10:05​

​​ 的基礎檔案,​

​10:07、10:09、10:10​

​​ 的增量日志檔案)、​

​fileid3​

​​ 的基礎檔案、(​

​fileid4 10:06、10:10​

​ 的增量日志檔案)

不同表格式的特性

Apache Hudi 資料湖概述
  • 寫入延遲:由于寫入期間發生同步合并,與​

    ​MOR​

    ​​ 相比 ,​

    ​COW​

    ​ 具有更高的寫入延遲
  • 讀取延遲:由于我們在​

    ​MOR​

    ​​ 中進行實時合并,是以與​

    ​MOR​

    ​​ 相比​

    ​COW​

    ​​ 往往具有更高的讀取延遲。但是如果根據需求配置了合适的壓縮政策,​

    ​MOR​

    ​ 可以很好地發揮作用
  • 更新代價:由于我們為每批寫入建立更新的資料檔案,是以​

    ​COW​

    ​​ 的​

    ​I/O​

    ​​ 成本将更高。由于更新進入增量日志檔案,​

    ​MOR​

    ​​ 的​

    ​I/O​

    ​ 成本非常低
  • 寫放大:假設有一個大小為​

    ​100Mb​

    ​​ 的資料檔案,并且每次更新​

    ​10%​

    ​​ 的記錄進行​

    ​4​

    ​​ 批寫入,​

    ​4​

    ​​ 次寫入後,​

    ​Hudi​

    ​​ 将擁有​

    ​5​

    ​​ 個大小為​

    ​100Mb​

    ​​ 的​

    ​COW​

    ​​ 資料檔案,總大小約​

    ​500Mb​

    ​​。​

    ​MOR​

    ​​ 的情況并非如此,由于更新進入日志檔案,寫入放大保持在最低限度。對于上面的例子,假設壓縮還沒有開始,在​

    ​4​

    ​​ 次寫入後,我們将有​

    ​1x100Mb​

    ​​ 的檔案和​

    ​4​

    ​​ 個增量日志檔案(​

    ​10Mb​

    ​​) 的大小約​

    ​140Mb​

Hudi的應用

說了這麼多,目前我們能運用 ​

​hudi​

​ 做什麼呢?

mysql cdc

​CDC​

​​ 的全稱是 ​

​Change Data Capture​

​​ ,​

​CDC​

​​ 技術是資料庫領域一個常用技術,主要面向資料庫的變更,是一種用于捕獲資料庫中資料變更的技術,可以把資料庫表中資料的變化以實時或近實時通知到下遊消費者。在數倉中,每天淩晨需要把業務庫的業務資料同步到數倉中,來為下遊 ​

​ETL​

​​的計算做資料準備。絕大多數公司都是使用資料內建工具(​

​dataX​

​​、​

​flinkX​

​​、​

​sqoop​

​​)等來實作,但是這些方式都是使用SQL的方式如​

​SELECT * FROM XX WHERE 1=1​

​​來全量抽取。此種方式給資料庫帶來了很大的壓力,甚至會影響到業務。那麼有沒有一種優雅的方式呢?比如 ​

​binlog​

​?

  • 基于查詢:這種​

    ​CDC​

    ​​ 技術是入侵式的,需要在資料源執行​

    ​SQL​

    ​​ 語句。使用這種技術實作​

    ​CDC​

    ​ 會影響資料源的性能。通常需要掃描包含大量記錄的整個表。
  • 基于日志:這種​

    ​CDC​

    ​​ 技術是非侵入性的,不需要在資料源執行​

    ​SQL​

    ​​ 語句。通過讀取源資料庫的日志檔案以識别對源庫表的建立、修改或删除資料

    中間件團隊目前已經對​​

    ​mysql​

    ​​ 的​

    ​binlog​

    ​​ 進行了解析,我們隻需要送出表的​

    ​binlog​

    ​​ 監聽申請,即可消費對應的​

    ​kafka topic​

    ​ 來消費資料。
  • Apache Hudi 資料湖概述
  • 觸發間隔:​

    ​spark​

    ​​ 結構化流每​

    ​15​

    ​​ 分鐘一個批次,處理這段時間内産生的​

    ​binlog​

    ​​ 消息并寫入到資料湖。

    去重方式:首選根據主鍵或者唯一鍵(支援多個)進行聚合,取​​

    ​gmt_modified​

    ​​ 最大的一條。考慮到在高并發等極端情況下會有同一時間修改同一條記錄的情況,是以在​

    ​gmt_modified​

    ​​ 相等的情況下,根據資料的​

    ​offset​

    ​​ 大小比較​​6​​​。

    幂等:​​

    ​spark​

    ​​ 結構化流對于​

    ​batch process​

    ​​ 僅僅支援​

    ​at least once​

    ​​,但是在​

    ​hudi upsert​

    ​​ 的特性下,實作了資料的幂等。

    心跳:每批次增量資料更新完後,将會更新該表的心跳時間,如果在一定時間内心跳未更新,則資料可能有問題,此時任務将會告警,人工介入處理

    通過這種方式我們可以近乎實時的捕捉​​

    ​mysql​

    ​​ 資料入湖,所有的數倉采集任務都從資料湖中拉取,将離線數倉和​

    ​mysql​

    ​​ 解耦,對​

    ​mysql​

    ​​ 無壓力,節省從庫高配置的費用,對業務不再有影響,并且任務抽取時間從之前到淩晨​

    ​3​

    ​​點(錯峰執行,避免對​

    ​mysql​

    ​​ 執行個體較大的壓力)提前到淩晨​

    ​20​

    ​​ 分附近,縮短了​

    ​9​

    ​​ 倍,下遊任務計算提前近​

    ​3​

    ​ 個小時

分庫資料查詢

在使用 ​

​mysql​

​​ 時,經常會遇到分庫分表的資料查詢比較困難,需要知道資料在哪個執行個體,哪個庫,哪張表,查詢較為複雜。如果把這些資料全部通過 ​

​CDC​

​ 同步到資料湖中,我們就可以在即席查詢上直接查詢這張表,不需要關心資料在哪裡,資料湖會幫我們把這些分庫分表的資料采集到同一張表中。

異常資料準實時分析

在業務場景中,我們會統計一些使用者,或者裝置的請求/上報資料的數量,比如最近 ​

​5​

​​ 分鐘的請求數量,當出現某些使用者/裝置的上報資料量異常時,簡單分析後,手動對這些使用者/裝置進行降級。​

​hudi​

​​ 可以近實時的采集這些資料,然後開發在即席查詢上查詢異常裝置​

​SELECT COUNT(1) CNT,USER_ID FROM HUDI_TABLE GROUP BY USER_ID ORDER BY CNT DESC LIMIT 100​

​,可以達到這種目的。

準實時圖表繪制

  1. 我更願意稱之為資料湖格式,​

    ​HDFS​

    ​在大資料的地位不可動搖,資料湖底層的資料還是存儲到 ​

    ​HDFS​

    ​ 之上。 ​​↩︎​​
  2. 列式存儲格式,在列裁剪生效時,隻需掃描檔案指定列即可,不需要讀取目前檔案的所有資料​​↩︎​​
  3. 列式存儲格式,謂詞下推生效時,通過讀取​

    ​footer​

    ​ 的中繼資料,比如該檔案某列的最大值、最小值,可以快速判斷要查詢的資料是否在該基礎檔案 ​​↩︎​​
  4. ​MOR​

    ​​ 表分為 ​

    ​RT​

    ​ 讀和 ​

    ​RO​

    ​ 讀,在 ​

    ​RT​

    ​ 讀時會合并基礎檔案和增量檔案,但在 ​

    ​RO​

    ​ 讀時,隻讀取基礎檔案,不會進行增量檔案的合并,下文會介紹。 ​​↩︎​​
  5. 友善大家了解,這裡就寫為新增檔案,實際上​

    ​hudi​

    ​ 會把插入的資料優先寫入到檔案較小的基礎檔案中,如果沒有較小的基礎檔案才會建立一個新的檔案。 ​​↩︎​​
  6. ​kafka​

    ​​ 發送 ​

    ​binlog​

    ​ 消息時,要保證同一個主鍵的資料發送到相同的​

    ​partition​

    ​,保證分區資料的有序性。 ​​↩︎​​