文章目錄
- 前言
- hudi是什麼
- hudi 實作更新的基本原理
- 基礎檔案
- 增量日志檔案
- 檔案組
- 檔案的版本
- COW表資料的更新
- MOR表資料的更新
- MOR 表的compact
- hudi 不同表格式的讀取方式
- COW表資料的讀取
- MOR表資料的讀取
- 不同表格式的特性
- Hudi的應用
- mysql cdc
- 分庫資料查詢
- 異常資料準實時分析
- 準實時圖表繪制
前言
介紹本文之前,先給大家說一些簡單的大資料概念。在大資料體系中,我們常用的存儲是
HDFS(Hadoop Distributed File System)
,常用的計算是
map-reduce
、
spark
、
flink
。對于
HDFS(Hadoop Distributed File System)
而言,它是一個分布式的檔案系統,資料平台的所有資料都存儲在裡面,最佳的使用實踐是:一次寫入,多次讀取。是以大資料的計算基本上都是批處理計算(對有界的、确定資料範圍的資料進行計算,不具備實時性),也就是
T-1
的計算,
T+1
才能知道資料的計算結果(比如在前些年使用者的銀行轉賬,隔天才能收到)。但在網際網路高速發展的今天,
T+1
的資料結果已經無法滿足使用者的需求,大資料也需要做出一些變革。
歸根結底,離線數倉無法支援實時業務的一個原因是不支援更新,或者說不支援高效的更新。舊的更新方式通常使用 overwrite 的方式來重寫舊的資料,但耗時長、代價大。 大資料也有一些支援更新的元件如:,
hbase
。但這些方式要麼是更新的代價較大,要麼就是無法與大資料的計算引擎充分結合。但是近兩年湧出的資料湖産品可以完美的解決大資料的更新問題,對于資料湖而言,更新并不是最主要的,它還支援資料庫具備的
kudu
等特性,資料的曆史鏡像查詢、資料的增量讀取、
ACID
等等。除此之外它更打通了和其它大資料計算引擎的結合,使得我們的資料有了統一的存儲地方:資料湖1。
savepoint
hudi是什麼
Apache Hudi
是一個流式資料湖平台,支援對海量資料快速更新。内置表格式,支援事務的存儲層、 一系清單服務、資料服務(開箱即用的攝取工具)以及完善的運維監控工具
上圖從下到上,由左向右看
-
底層的資料可以存儲到hudi
、hdfs
、s3
、azure
等存儲alluxio
-
可以使用hudi
計算引擎來消費spark/flink
等消息隊列的資料,而這些資料可能來源于kafka、pulsar
或者微服務的業務資料、日志資料,也可以是app
等資料庫的mysql
日志資料binlog
-
首先将這些資料處理為spark/hudi
格式的hudi
(原始表),然後這張原始表可以被row tables
(增量處理)生成一張Incremental ETL
格式的hudi
派生表derived tables
-
支援的查詢引擎有:hudi
、trino
、hive
、impala
、spark
等presto
- 支援
等計算引擎繼續對spark、flink、map-reduce
的資料進行再次加工處理hudi
上圖資訊總結一下:
- 支援大多數存儲引擎
- 支援消費消息隊列資料
- 支援多引擎查詢分析
- 支援大資料的計算引擎
- 支援增量處理
hudi 實作更新的基本原理
在介紹之前,友善大家了解,大家可以先記住一個基本原理,
hudi
資料的更新都是資料檔案的追加。另外
hudi
根據不同資料更新方式劃分了兩種表格式,分别為
MOR(MERGE ON READ)
讀時合并和
COW(COPY ON WRITE)
寫時合并。通過名字大家可以簡單了解下兩種表格式,下面會介紹具體更新細節。
基礎檔案
hudi
底層,将資料存儲到基礎檔案中,該檔案以
parquet
、
orc
等列存格式存放。該格式在大資料存儲中被廣泛使用,列裁剪2 謂詞下推3等特性,對于資料的讀取非常高效。
增量日志檔案
hudi
支援兩種表格式,
COW
和
MOR
,在
MOR
格式中,更新的資料将被寫入到增量日志檔案(
delta log
)中。每次資料的更新,都會在對應的基礎檔案上追加一個增量日志檔案。在資料被查詢4時,
hudi
将會實時合并基礎檔案和增量日志檔案。
檔案組
通常我們的資料在一個分區内會包含很多的基礎檔案和增量日志檔案,每一個基礎檔案和在它之上的增量檔案組成一個檔案組。在
COW
格式中,隻有基礎檔案,那麼組成
fileGroup
檔案就是:同一個檔案的多次更新的基礎檔案的集合。
檔案的版本
由于
hudi
資料的更新都是檔案的更新,那麼在更新後,會生成資料新的版本。上圖展示的是在
COW
表格式下,資料的更新生成一個
V2
的基礎檔案,
V2
的基礎檔案資料是增量資料和
V1
資料進行合并的結果。
COW表資料的更新
COW
格式下,
hudi
在對一批資料進行更新時會通過索引查找資料所在的資料檔案在哪裡,如果找到就重寫該檔案,如果找不到将會新增檔案5
- 初始狀态: 共有兩個
,fileGroup
和fileGroup1
,基礎資料檔案隻有fileGroup3
和base file1 v1
base file3 v1
- 更新後: 從更新後的狀态我們可以看出,有對
進行了更新操作,基礎檔案的版本變成了fileGroup1
。v2
中新增了一個fileGroup2
基礎檔案。base file2 v1
沒有進行資料的更新,也就沒有檔案的變化fileGroup3
表在對資料進行更新時,需要重寫整個基礎檔案,即使我們隻更新了該檔案的一條資料,該表格式有寫放大的缺點,但該表格式較為簡單,少了
COW
表格式的合并操作,對于頻繁讀的表比較推薦該格式
MOR
MOR表資料的更新
MOR
表和
COW
表類似,在更新之前首先會使用索引來查找更新的資料檔案位置,找到之後要更新的資料會在基礎檔案的基礎之上新增一個
delta log
增量檔案。如果沒有找到,将會新增一個基礎檔案。
MOR
表的資料在讀取時,會實時合并基礎檔案和增量檔案傳回給使用者,如果增量檔案較多,會導緻查詢資料結果較慢。
相比較
COW
表,
MOR
的讀取有非常明顯的讀放大,為了減少增量檔案的數量,
MOR
表需要定期的進行
compact
壓縮操作。
MOR 表的compact
MOR
表
compact
是一個單獨的操作,
compact
時,
hudi
首先讀取所有的增量資料,并将其存到
map
中, 然後使用的生産者消費者模式進行資料的合并。生産者将會讀取基礎檔案,然後将資料放入到記憶體隊列,消費者消費隊列的資料并和
map
中的增量資料進行對比來選擇最新的資料。最後将所有資料寫入到新的
v2
檔案中。
hudi 不同表格式的讀取方式
COW表資料的讀取
首先看下面的
timeline
,在
10:00
,
10:05
,
10:10
三個時間時,
hudi
送出了三次
commit
。
-
時,新增了兩個基礎檔案10:00
和fileid1
fileid2
-
時,10:05
,fileid1
,fileid2
各有一份基礎資料fileid3
-
時,進行了一次送出,對于基礎檔案10:10
、fileid1
各更新了一個版本,fileid2
沒有進行資料的更新,新增了一個fileid3
fileid4
對該
hudi
表進行查詢時,分析下選擇的基礎檔案有哪些。
- 如果我們執行的查詢是
,也就是說我們要查詢送出時間在select count(*) QUERY STARTING AFTER COMMIT 10:10
之後的資料量,那麼此時我們會讀取到10:10
分對應的基礎檔案鏡像。也就是10:10
的基礎檔案、fileid1 10:10
的基礎檔案、fileid2 10:10
的基礎檔案、fileid3 10:05
的基礎檔案。fileid4 10:10
- 如果我們執行的查詢是
,也就是查詢select count(*) QUERY STARTING BEFORE COMMIT 10:10
時對應的基礎檔案鏡像,此時掃描出來的基礎檔案有:10:05
的基礎檔案、fileid1 10:05
的基礎檔案、fileid2 10:05
的基礎檔案。fileid3 10:05
通過上面,大家應該可以看出來, 實際上是通過
hudi
來實作事務的隔離、不同時期鏡像的讀取。
MVCC
MOR表資料的讀取
在對
MOR
表資料讀取時和
COW
表類似,隻不過
MOR
表多了
RO
表和
RT
表。
RO
表又叫讀優化表,在讀取資料時隻讀取基礎檔案,此時會獲得和
COW
表相同的讀取性能,但是缺點也很明顯,無法讀取到最新的增量資料。
RT
表又叫實時表,即讀取資料時實時合并基礎檔案和增量日志檔案,讀取的資料較全,但是會耗費讀取端較大的資源。
仍然先看圖最下方的
timeline
-
、10:01
、10:02
、10:03
共四次的增量日志更新操作10:04
-
進行了一次10:05
操作compact
-
、10:06
、10:07
、10:08
、10:09
四次增量日志更新操作10:10
然後我們逐個分析
-
時,進行了10:05
操作,将會合并之前的所有增量日志檔案,合并後資料的基礎檔案有compact
、fileid1
、fileid2
fileid3
-
時對10:06
、fileid1
進行了更新,各新增了一個fileid4
的增量日志檔案10:06
-
時對10:07
進行了更新,新增一個fileid2
的增量日志檔案10:07
-
時對10:08
進行了更新,新增一個fileid1
的增量日志檔案10:08
-
時對10:09
、fileid1
進行了更新,各新增一個fileid2
的增量日志檔案10:09
-
時對10:10
、fileid2
進行了更新,新增一個fileid4
的增量日志檔案10:10
然後在對資料讀取時,如果我們執行的查詢是
QUERY ON READ OPTIMIZED TABLE AFTER COMMIT 10:10
即讀取
10:10
時的
RT
表,上面說了
RT
表隻讀區基礎檔案,此時掃描到的檔案有:
fileid1 10:05
的基礎檔案、
fileid2 10:05
的基礎檔案、
fileid3 10:05
的基礎檔案。
如果我們執行的查詢是
QUERY ON NEAR REAL-TIME TABLE AFTER COMMIT 10:10
即讀取
10:10
時的
RT
表,該表會合并基礎檔案和增量日志檔案,是以掃描到的資料檔案有:(
fileid1 10:05
的基礎檔案,
10:06、10:08、10:09
的增量日志檔案)、(
fileid2 10:05
的基礎檔案,
10:07、10:09、10:10
的增量日志檔案)、
fileid3
的基礎檔案、(
fileid4 10:06、10:10
的增量日志檔案)
不同表格式的特性
- 寫入延遲:由于寫入期間發生同步合并,與
相比 ,MOR
具有更高的寫入延遲COW
- 讀取延遲:由于我們在
中進行實時合并,是以與MOR
相比MOR
往往具有更高的讀取延遲。但是如果根據需求配置了合适的壓縮政策,COW
可以很好地發揮作用MOR
- 更新代價:由于我們為每批寫入建立更新的資料檔案,是以
的COW
成本将更高。由于更新進入增量日志檔案,I/O
的MOR
成本非常低I/O
- 寫放大:假設有一個大小為
的資料檔案,并且每次更新100Mb
的記錄進行10%
批寫入,4
次寫入後,4
将擁有Hudi
個大小為5
的100Mb
資料檔案,總大小約COW
。500Mb
的情況并非如此,由于更新進入日志檔案,寫入放大保持在最低限度。對于上面的例子,假設壓縮還沒有開始,在MOR
次寫入後,我們将有4
的檔案和1x100Mb
個增量日志檔案(4
) 的大小約10Mb
140Mb
Hudi的應用
說了這麼多,目前我們能運用
hudi
做什麼呢?
mysql cdc
CDC
的全稱是
Change Data Capture
,
CDC
技術是資料庫領域一個常用技術,主要面向資料庫的變更,是一種用于捕獲資料庫中資料變更的技術,可以把資料庫表中資料的變化以實時或近實時通知到下遊消費者。在數倉中,每天淩晨需要把業務庫的業務資料同步到數倉中,來為下遊
ETL
的計算做資料準備。絕大多數公司都是使用資料內建工具(
dataX
、
flinkX
、
sqoop
)等來實作,但是這些方式都是使用SQL的方式如
SELECT * FROM XX WHERE 1=1
來全量抽取。此種方式給資料庫帶來了很大的壓力,甚至會影響到業務。那麼有沒有一種優雅的方式呢?比如
binlog
?
- 基于查詢:這種
技術是入侵式的,需要在資料源執行CDC
語句。使用這種技術實作SQL
會影響資料源的性能。通常需要掃描包含大量記錄的整個表。CDC
- 基于日志:這種
技術是非侵入性的,不需要在資料源執行CDC
SQL
語句。通過讀取源資料庫的日志檔案以識别對源庫表的建立、修改或删除資料
中間件團隊目前已經對
的mysql
進行了解析,我們隻需要送出表的binlog
監聽申請,即可消費對應的binlog
來消費資料。kafka topic
- 觸發間隔:
結構化流每spark
分鐘一個批次,處理這段時間内産生的15
binlog
消息并寫入到資料湖。
去重方式:首選根據主鍵或者唯一鍵(支援多個)進行聚合,取
最大的一條。考慮到在高并發等極端情況下會有同一時間修改同一條記錄的情況,是以在gmt_modified
相等的情況下,根據資料的gmt_modified
offset
大小比較6。
幂等:
結構化流對于spark
僅僅支援batch process
,但是在at least once
hudi upsert
的特性下,實作了資料的幂等。
心跳:每批次增量資料更新完後,将會更新該表的心跳時間,如果在一定時間内心跳未更新,則資料可能有問題,此時任務将會告警,人工介入處理
通過這種方式我們可以近乎實時的捕捉
資料入湖,所有的數倉采集任務都從資料湖中拉取,将離線數倉和mysql
解耦,對mysql
無壓力,節省從庫高配置的費用,對業務不再有影響,并且任務抽取時間從之前到淩晨mysql
點(錯峰執行,避免對3
執行個體較大的壓力)提前到淩晨mysql
分附近,縮短了20
倍,下遊任務計算提前近9
個小時3
分庫資料查詢
在使用
mysql
時,經常會遇到分庫分表的資料查詢比較困難,需要知道資料在哪個執行個體,哪個庫,哪張表,查詢較為複雜。如果把這些資料全部通過
CDC
同步到資料湖中,我們就可以在即席查詢上直接查詢這張表,不需要關心資料在哪裡,資料湖會幫我們把這些分庫分表的資料采集到同一張表中。
異常資料準實時分析
在業務場景中,我們會統計一些使用者,或者裝置的請求/上報資料的數量,比如最近
5
分鐘的請求數量,當出現某些使用者/裝置的上報資料量異常時,簡單分析後,手動對這些使用者/裝置進行降級。
hudi
可以近實時的采集這些資料,然後開發在即席查詢上查詢異常裝置
SELECT COUNT(1) CNT,USER_ID FROM HUDI_TABLE GROUP BY USER_ID ORDER BY CNT DESC LIMIT 100
,可以達到這種目的。
準實時圖表繪制
- 我更願意稱之為資料湖格式,
在大資料的地位不可動搖,資料湖底層的資料還是存儲到 HDFS
之上。 ↩︎HDFS
- 列式存儲格式,在列裁剪生效時,隻需掃描檔案指定列即可,不需要讀取目前檔案的所有資料↩︎
- 列式存儲格式,謂詞下推生效時,通過讀取
的中繼資料,比如該檔案某列的最大值、最小值,可以快速判斷要查詢的資料是否在該基礎檔案 ↩︎footer
-
表分為 MOR
讀和 RT
讀,在 RO
讀時會合并基礎檔案和增量檔案,但在 RT
讀時,隻讀取基礎檔案,不會進行增量檔案的合并,下文會介紹。 ↩︎RO
- 友善大家了解,這裡就寫為新增檔案,實際上
會把插入的資料優先寫入到檔案較小的基礎檔案中,如果沒有較小的基礎檔案才會建立一個新的檔案。 ↩︎hudi
-
發送 kafka
消息時,要保證同一個主鍵的資料發送到相同的binlog
,保證分區資料的有序性。 ↩︎partition