天天看點

基于 Apache Hudi 和 Apache Spark Sql 的近實時數倉架構分享

前言

在大資料的計算場景中,根據資料的延遲情況,資料的時效性一般分為離線、準實時、實時。離線計算一般是以天(​

​T​

​​)為界限,比如離線場景最多的就是​

​T-1​

​​ 計算,也就是今天計算昨天産生的資料。準實時計算一般以小時(​

​H​

​​)為界限,比如 ​

​H-1​

​​ 的計算,即目前小時處理上個小時的資料,當然某些業務場景下也有(​

​0.5H-1​

​​)計算的存在。而實時計算一般是以秒為界限,即資料的延遲最大粒度為秒級。對于離線和準實時計算,我們可以在排程系統中通過不同的排程周期實作,而實時計算通常需要一個常駐的任務來進行。對于相同業務,不同的時效要求下,任務計算所需要的資源成本通常為:離線 < 準實時 < 實時。 本文所說的近實時介于實時和準實時之間的一種實效要求,一般在 ​

​1​

​​ 分鐘至 ​

​30​

​ 分鐘之間,在該時延的要求下,一般排程系統也能實作,但具體是使用常駐任務還是排程系統周期排程就看業務所能容忍的最大延遲時間與計算成本之間的權衡了。

近實時架構

在剛剛接觸資料湖的概念以及深入了解其各種特性時,曾 ​

​yy​

​​ 過一種如下的數倉架構。但是直到最近基礎設施才滿足以該方式實作。在之前,這套架構如果想讓數倉/數開同學用起來,成本有些太高。對于離線數倉,為了代碼的可讀性和維護性,我們讓數倉直接在排程平台的界面上使用 ​

​sql​

​​開發。而近實時數倉,由于一些架構的限制,還無法全部使用 ​

​sql​

​​ 來實作,需要配合使用 ​

​java/scala​

​​ 代碼來進行開發,然後把 ​

​jar​

​​ 包釋出到叢集上才能被排程執行。由于無法自動對 ​

​jar​

​ 包進行中繼資料的解析,任務上下遊依賴配置時還需要人為選擇,對代碼的開發、維護和釋出增加了困難。是以這種方法一直沒有推廣開來,隻能由專業的幾個人來做,直到最近,看到了曙光。

基于 Apache Hudi 和 Apache Spark Sql 的近實時數倉架構分享

在數倉中,我們根據業務要求的資料時效性劃分為離線數倉和近實時數倉,其中離線數倉(離線、準實時計算)由 ​

​spark sql​

​​ 實作,近實時數倉由資料湖增量計算實作。在我們内部無論是增量計算還是離線計算,全部在 ​

​dag​

​​ 離線排程平台 ​

​hera​

​ 上進行周期排程。

而對于不同層級,也有不同的計算方式

  • ​ods​

    ​層
  • 近實時數倉:由一個 ​

    ​spark​

    ​​ 或者 ​

    ​flink​

    ​​ 任務來微批同步 ​

    ​binlog​

    ​​、日志檔案、​

    ​kafka​

    ​​ 等實時産生的資料,内部使用的是 ​

    ​spark structured streaming​

    ​​ 來執行,結構化流支援 ​

    ​Trigger once​

    ​​ 的觸發方式,在排程系統周期排程時能自動從上次的 ​

    ​checkpoint​

    ​ 位置繼續消費
  • 離線數倉:為了降低對其它存儲的壓力,對于一些已經存在于近實時數倉的資料,每天零點以快照的方式存儲一份資料到離線數倉,無需重複抽取。對于一些不在近實時數倉的資料直連相應資料源,使用 ​

    ​sqoop​

    ​​、​

    ​dataX​

    ​ 等方式離線同步即可
  • ​dw​

    ​層
  • 近實時數倉: 直接由近實時數倉 ​

    ​ods​

    ​ 層增量計算得出
  • 離線數倉:對于一些離線數倉需要的,近實時數倉存在的表每天零點仍然以快照的方式同步到離線數倉,一些隻有離線需求的業務直接離線計算即可
  • ​ads​

    ​層
  • 近實時數倉:和 ​

    ​dw​

    ​ 層類似。
  • 離線數倉:和 ​

    ​dw​

    ​ 層類似。

最終,無論是離線數倉還是近實時數倉的資料以直連、導出或者接口的方式提供給外部。

實際上這裡有一個問題,為何不能所有的離線計算改為近實時計算呢?考慮主要有兩個

一:在資料湖中,更新通常是以檔案的變更來實作的,微批處理的時延越低,儲存的時間越久,小檔案的個數也就越多,無論是對檔案管理服務如​

​namenode​

​​ 的壓力,還是對讀取資料的性能都會越來越低。一般我們數倉中的資料都是永久儲存或者配置 ​

​TTL​

​ 隻儲存近幾年的資料,如果在資料湖中資料的生命周期也儲存如此久,小檔案數量會暴增,産生其它意想不到的事故。 是以資料湖中儲存近一周的資料變更即可,查詢一周前的鏡像資料可以到離線數倉查詢。

二:基于成本的考慮,大家都知道,在相同業務下,通常時延要求越低的所消耗的資源成本越高即:實時>近實時>離線。對于一些離線計算就能滿足的業務場景,直接進行離線計算即可,沒必要進行實時或者近實時計算。

架構選型

該架構依賴以下幾個資料庫的特性

  • ​ACID​

    ​ :作為一個存儲必須要有,無需多言
  • 增量查詢 :既然要做增量計算,為了讀取和寫入的性能,那麼不可能再像傳統數倉那樣,掃描所有資料檔案再進行 ​

    ​insert overwrite​

  • 高效 ​

    ​upsert​

    ​:要實作近實時,對upsert性能要求比較高
  • 時間旅行:為了保證離線數倉做snapshot的幂等,該功能需要支援

經過對 ​

​delta lake​

​​、​

​iceberg​

​​ 和 ​

​hudi​

​​ 的對比,我們最終選擇了 ​

​hudi​

​​,最新的對比可檢視文末 ​

​onehouse​

​​ 的對比。得益于 ​

​timeline​

​​ 的 ​

​MVCC​

​​ 設計,​

​HUDI​

​​ 實作了 ​

​ACID​

​​、增量查詢、時間旅行。同時 ​

​mor​

​​、​

​cow​

​​ 兩種不同的表格式來支援寫多讀少、和讀多寫少的場景。不同的索引類型(​

​bucket index​

​​, ​

​bloom index​

​​, ​

​hbase index​

​​等)實作了高效的 ​

​upsert​

​,恰當的解決了我們的所有痛點。

HUDI 表CRUD

建表

如下建立一個表類型為 ​

​merge on read​

​​,主鍵為 ​

​id​

​​,預合并字段為​

​gmt_modified​

​​,并由 ​

​dt、hour​

​ 二級分區的建表語句

create table bi_dw.dim_test (
  id bigint,
  name string,
  gmt_modified bigint,
  dt string,
  hour string
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'gmt_modified'
 )
partitioned by (dt, hour)
location '/tmp/hudi/dim_test';      

對于非 ​

​ods​

​​ 層的 ​

​hudi​

​​ 表我們通常以如上的方式建表。而對于 ​

​ods​

​​ 層,由于需要讀取其它存儲的資料(​

​binglog​

​​、​

​file​

​​、​

​mq​

​​),目前無法像 ​

​flink​

​​ 那樣建立一個 ​

​source​

​​ 表來做,是以需要開發一個 ​

​spark/flink​

​​ 程式來做 ​

​hudi​

​​的寫入(也可以使用官方提供的 ​

​DeltaStreamer​

​​),然後周期排程即可。對于已經存在資料的 ​

​ods​

​ 表的建表語句:

create table bi_ods.ods_table using hudi location '/tmp/hudi/ods_table';      

建表時,我們無須指定表的配置和 ​

​schema​

​​,​

​hudi​

​​ 會自動根據表 ​

​location​

​​下的元資訊識别表的配置和​

​schema​

​.

資料讀取

之前無法全部用 ​

​sql​

​​ 來做的主要原因是無法通過 ​

​spark sql​

​​ 來對 ​

​hudi​

​​ 表進行增量讀,快照讀。但是在 ​

​HUDI-3161​

​​上,​

​ForwardXu​

​​ 大佬在 ​

​spark sql​

​​ 上支援了 ​

​call​

​​ 指令。通過 ​

​call​

​​ 指令我們可以在 ​

​spark-sql console​

​​ 來完成一些運維操作,比如執行 ​

​compact​

​​,執行 ​

​clean​

​​,檢視送出記錄等等。在這裡我看到了讀取 ​

​hudi​

​​ 增量資料、快照資料的契機,于是新增了一個​

​copy_to_table​

​​ 指令,該指令可以把 ​

​hudi​

​​ 表的資料以增量、快照、讀優化等讀取方式複制資料到 ​

​hive​

​​ 表,于是于我們在 ​

​spark-sql​

​​ 裡面讀取 ​

​hudi​

​ 表資料就變成了如下方式:

# read snapshot data from hudi table
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'snapshot',as_of_instant=>'20221018055647688')
select * from $viewName

# read incremental data from hudi table
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'incremental',begin_instance_time=>'20221018055647688')
select * from $viewName

# read read_optimized data from hudi table 
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'read_optimized')
select * from $viewName      

資料插入

如果能保證資料是新增的,那麼直接執行 ​

​insert​

​​ 語句即可。但是 ​

​hudi​

​​ 表中如果已經存在相同主鍵的資料,将會導緻資料重複,建議使用下面的 ​

​merge​

​ 語句插入。

插入資料到非分區表

insert into hudi_tbl select 1, 'a1', 20;      

插入資料到動态分區

insert into hudi_tbl_part partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;      

插入資料到指定分區

insert into hudi_tbl_part partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;      

資料删除

​hudi​

​ 允許删除使用者指定的資料,文法如下:

DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

delete from hudi_mor_tbl where id % 2 = 0;      

資料更新

update hudi_mor_tbl set price = price * 2, gmt_modified = 1111 where id = 1;

update hudi_cow_pt_tbl set gmt_modified = 1001 where name = 'a1';      

​hudi​

​​ 允許使用任意條件來進行資料的更新,但是要求預合并的字段(本文為​

​gmt_modified​

​)一定要更新,否則将會報錯。

除了使用 ​

​update​

​​ 語句外,​

​hudi​

​​ 還支援更強大的 ​

​merge​

​ 語句。該語句可以根據不同的條件對目标表資料進行更新、删除、和新增。

create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl as target
using (
  select id, name, '1000' as ts, flag, dt, hh from merge_source2
) as source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;      

其中 ​

​target​

​​ 作為目标表,​

​merge_source2​

​​ 為增量資料,​

​join​

​​ 的條件為​

​target.id = source.id​

​​.當滿足條件時,判斷 ​

​flag​

​​ 如果不等于 ​

​delete​

​​,則進行資料的更新,​

​flag​

​​ 如果等于 ​

​delete​

​​ 則删除資料。如果 ​

​target.id = source.id​

​ 條件不滿足,即對于不存在目标表的增量資料直接新增即可

近實時排程

大資料離線排程平台基本都支援定時排程、依賴排程、自依賴排程等排程方式,除此之外在腳本開發時還支援一些内部時間表達式,比如 ​

​hera​

​ 排程系統的時間表達式為:

select * from xx_part where dt = ${zdt.addDay(-1).format("yyyyMMdd")}

select * from xx_part where dt = current_date();      

表達式的 ​

​zdt​

​​ 對象為目前任務的排程時間,該表達式的結果為排程時間減一天,并将日期格式化為 ​

​yyyyMMdd​

​​ 格式,此表達式在數倉做 ​

​T-1​

​​ 的離線計算時經常用到。而下面的 ​

​current_date​

​ 為任務運作時時間,在任務延遲執行的情況下會導緻業務計算出錯,誰用誰坑。

說到這裡了,在這裡就簡單解釋下排程時間,在排程系統中,一般分為業務時間、排程時間、觸發時間。

  • 排程時間:為排程實際應該被觸發的時間,每天 ​

    ​0​

    ​​ 點的任務,會生成一個 ​

    ​0​

    ​​ 點的排程時間,該任務會在 ​

    ​0​

    ​ 點進入任務隊列,但是并不表示計算業務已經開始運作。即使任務的運作時間已經延遲,排程時間始終不變。
  • 業務時間:計算的業務資料所在的時間,通常由排程時間計算而來,比如​

    ​T-1​

    ​​ 的計算,就是以排程時間減 ​

    ​1​

    ​ 天作為業務時間,計算業務時間指定的分區資料。
  • 觸發時間:任務實際開始運作的時間,大資料的離線計算對于排程時間的精确性的要求并沒有那麼高,也就是說即使我配置了一個 ​

    ​0​

    ​​ 點計算的任務,但是可能因為資源等問題導緻任務阻塞 ​

    ​10​

    ​​ 分鐘後才運作也沒有關系,而這裡的 ​

    ​00:10​

    ​ 分就是觸發時間。

正是排程時間這種不變的特性,是以在排程系統中,我們通常隻使用排程時間進行業務時間的計算,而不是以觸發時間。

在此貼一下近實時排程的依賴關系:

基于 Apache Hudi 和 Apache Spark Sql 的近實時數倉架構分享

我們在離線排程平台上要實作近實時數倉的計算,任務的開始節點為一個每 ​

​1-30​

​ 分鐘執行一次的定時任務,下遊任務根據業務場景以依賴排程進行計算,最主要的就是如何通過排程的方式做增量計算。

近實時數倉的所有表均為 ​

​hudi​

​​ 表,所有的計算均為增量計算,以上圖的​

​dwd_1​

​ 任務舉例子,我們讀取上遊表的增量資料時可以通過以下表達式讀取

call copy_to_table(table=>'dim_1',new_table=>'dim_1_view',query_type=>'snapshot',as_of_instant=>'${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}')
call copy_to_table(table=>'ods_2',new_table=>'ods_2_view',query_type=>'incremental',begin_instance_time=>'${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}')

with source as (
select id,name,price,gmt_modified from ods_2_view 
left join dim_1_view 
on dim_1_view.id = ods_2_view.id
) 
merge into dwd_1 as target
using  source on target.id=source.id
when matched  then update set *
when not matched then insert *      

​${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}​

​​ 表達式将任務的排程時間減去 ​

​10​

​​ 分鐘并格式化為 ​

​yyyyMMddHHmmssSSS​

​​ 格式作為其業務時間,表示 ​

​M-10​

​ 的計算。

是以前兩行的含義為:分别複制 ​

​dim_1​

​​ 十分鐘前的快照資料和 ​

​ods_2​

​​ 最近 ​

​10​

​​ 分鐘的增量資料到 ​

​dim_1_view​

​​ 和 ​

​ods_2_view​

​。

然後注冊一個 ​

​source​

​​ 視圖,該試圖由 ​

​ods_2_view​

​​ 和 ​

​dim_1_view​

​​ 通過 ​

​id​

​​ 關聯得到,然後将 ​

​source​

​​ 視圖的資料通過 ​

​merge into​

​​ 語句寫入到 ​

​dwd_1​

​​ 表。該文法通過 ​

​source.id = target.id​

​​ 進行關聯,如果能夠關聯到,更新 ​

​dwd_1​

​​ 表的該行資料。如果關聯不到直接插入 ​

​source​

​ 視圖的該行資料。

尾語

通過本篇文章叙述了使用 ​

​hudi​

​​、​

​spark sql​

​ 實作近實時數倉的架構方式。還有很多不完美的地方,需要繼續完善,抛磚引玉,為大家提供一份可以參考的方案。

實際上 ​

​copy_to_table​

​​ 需要将 ​

​hudi​

​​ 表資料 ​

​copy​

​​ 一份落盤,是比較耗時的。是以我又新增了一個 ​

​copy_to_temp_view​

​​ 的指令,該指令會将 ​

​hudi​

​​ 表注冊為 ​

​spark​

​​ 的臨時視圖表,節省了落盤的時間,預計會在 ​

​0.13.0​

​ 版本能夠使用。

注:以上使用的 ​

​hudi​

​​ 版本為 ​

​0.12.1​

​​, ​

​spark​

​​ 版本為 ​

​3.2.2​

本文引用的連結:

[1]: Apache Hudi 文檔 https://hudi.apache.org

[2]: hera 離線排程系統 https://github.com/scxwhite/hera

[3]: Apache Hudi vs Delta Lake vs Apache Iceberg - Lakehouse Feature Comparison https://www.onehouse.ai/blog/apache-hudi-vs-delta-lake-vs-apache-iceberg-lakehouse-feature-comparison

[4]: HUDI-3161 call produce command pr https://github.com/apache/hudi/pull/4535

繼續閱讀