前言
在大資料的計算場景中,根據資料的延遲情況,資料的時效性一般分為離線、準實時、實時。離線計算一般是以天(
T
)為界限,比如離線場景最多的就是
T-1
計算,也就是今天計算昨天産生的資料。準實時計算一般以小時(
H
)為界限,比如
H-1
的計算,即目前小時處理上個小時的資料,當然某些業務場景下也有(
0.5H-1
)計算的存在。而實時計算一般是以秒為界限,即資料的延遲最大粒度為秒級。對于離線和準實時計算,我們可以在排程系統中通過不同的排程周期實作,而實時計算通常需要一個常駐的任務來進行。對于相同業務,不同的時效要求下,任務計算所需要的資源成本通常為:離線 < 準實時 < 實時。 本文所說的近實時介于實時和準實時之間的一種實效要求,一般在
1
分鐘至
30
分鐘之間,在該時延的要求下,一般排程系統也能實作,但具體是使用常駐任務還是排程系統周期排程就看業務所能容忍的最大延遲時間與計算成本之間的權衡了。
近實時架構
在剛剛接觸資料湖的概念以及深入了解其各種特性時,曾
yy
過一種如下的數倉架構。但是直到最近基礎設施才滿足以該方式實作。在之前,這套架構如果想讓數倉/數開同學用起來,成本有些太高。對于離線數倉,為了代碼的可讀性和維護性,我們讓數倉直接在排程平台的界面上使用
sql
開發。而近實時數倉,由于一些架構的限制,還無法全部使用
sql
來實作,需要配合使用
java/scala
代碼來進行開發,然後把
jar
包釋出到叢集上才能被排程執行。由于無法自動對
jar
包進行中繼資料的解析,任務上下遊依賴配置時還需要人為選擇,對代碼的開發、維護和釋出增加了困難。是以這種方法一直沒有推廣開來,隻能由專業的幾個人來做,直到最近,看到了曙光。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiQDOxEzX3xCZlhXam9VbsUmepNXZy9CXwJWZ3xCdh1mcvZ2Lc1zaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLwIzX39GZhh2csATMflHLwEzX4xSZz91ZsAzMfRHLGZkRGZkRfJ3bs92YskmNhVTYykVNQJVMRhXVEF1X0hXZ0xiNx8VZ6l2cssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3YTNzgDN4EDO4MjY1UjNzYzXyMDNzIDM1AzLcFTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
在數倉中,我們根據業務要求的資料時效性劃分為離線數倉和近實時數倉,其中離線數倉(離線、準實時計算)由
spark sql
實作,近實時數倉由資料湖增量計算實作。在我們内部無論是增量計算還是離線計算,全部在
dag
離線排程平台
hera
上進行周期排程。
而對于不同層級,也有不同的計算方式
-
層ods
- 近實時數倉:由一個
或者 spark
任務來微批同步 flink
、日志檔案、binlog
等實時産生的資料,内部使用的是 kafka
來執行,結構化流支援 spark structured streaming
的觸發方式,在排程系統周期排程時能自動從上次的 Trigger once
位置繼續消費checkpoint
- 離線數倉:為了降低對其它存儲的壓力,對于一些已經存在于近實時數倉的資料,每天零點以快照的方式存儲一份資料到離線數倉,無需重複抽取。對于一些不在近實時數倉的資料直連相應資料源,使用
、sqoop
等方式離線同步即可dataX
-
層dw
- 近實時數倉: 直接由近實時數倉
層增量計算得出ods
- 離線數倉:對于一些離線數倉需要的,近實時數倉存在的表每天零點仍然以快照的方式同步到離線數倉,一些隻有離線需求的業務直接離線計算即可
-
層ads
- 近實時數倉:和
層類似。dw
- 離線數倉:和
層類似。dw
最終,無論是離線數倉還是近實時數倉的資料以直連、導出或者接口的方式提供給外部。
實際上這裡有一個問題,為何不能所有的離線計算改為近實時計算呢?考慮主要有兩個
一:在資料湖中,更新通常是以檔案的變更來實作的,微批處理的時延越低,儲存的時間越久,小檔案的個數也就越多,無論是對檔案管理服務如
namenode
的壓力,還是對讀取資料的性能都會越來越低。一般我們數倉中的資料都是永久儲存或者配置
TTL
隻儲存近幾年的資料,如果在資料湖中資料的生命周期也儲存如此久,小檔案數量會暴增,産生其它意想不到的事故。 是以資料湖中儲存近一周的資料變更即可,查詢一周前的鏡像資料可以到離線數倉查詢。
二:基于成本的考慮,大家都知道,在相同業務下,通常時延要求越低的所消耗的資源成本越高即:實時>近實時>離線。對于一些離線計算就能滿足的業務場景,直接進行離線計算即可,沒必要進行實時或者近實時計算。
架構選型
該架構依賴以下幾個資料庫的特性
-
:作為一個存儲必須要有,無需多言ACID
- 增量查詢 :既然要做增量計算,為了讀取和寫入的性能,那麼不可能再像傳統數倉那樣,掃描所有資料檔案再進行
insert overwrite
- 高效
:要實作近實時,對upsert性能要求比較高upsert
- 時間旅行:為了保證離線數倉做snapshot的幂等,該功能需要支援
經過對
delta lake
、
iceberg
和
hudi
的對比,我們最終選擇了
hudi
,最新的對比可檢視文末
onehouse
的對比。得益于
timeline
的
MVCC
設計,
HUDI
實作了
ACID
、增量查詢、時間旅行。同時
mor
、
cow
兩種不同的表格式來支援寫多讀少、和讀多寫少的場景。不同的索引類型(
bucket index
,
bloom index
,
hbase index
等)實作了高效的
upsert
,恰當的解決了我們的所有痛點。
HUDI 表CRUD
建表
如下建立一個表類型為
merge on read
,主鍵為
id
,預合并字段為
gmt_modified
,并由
dt、hour
二級分區的建表語句
create table bi_dw.dim_test (
id bigint,
name string,
gmt_modified bigint,
dt string,
hour string
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'gmt_modified'
)
partitioned by (dt, hour)
location '/tmp/hudi/dim_test';
對于非
ods
層的
hudi
表我們通常以如上的方式建表。而對于
ods
層,由于需要讀取其它存儲的資料(
binglog
、
file
、
mq
),目前無法像
flink
那樣建立一個
source
表來做,是以需要開發一個
spark/flink
程式來做
hudi
的寫入(也可以使用官方提供的
DeltaStreamer
),然後周期排程即可。對于已經存在資料的
ods
表的建表語句:
create table bi_ods.ods_table using hudi location '/tmp/hudi/ods_table';
建表時,我們無須指定表的配置和
schema
,
hudi
會自動根據表
location
下的元資訊識别表的配置和
schema
.
資料讀取
之前無法全部用
sql
來做的主要原因是無法通過
spark sql
來對
hudi
表進行增量讀,快照讀。但是在
HUDI-3161
上,
ForwardXu
大佬在
spark sql
上支援了
call
指令。通過
call
指令我們可以在
spark-sql console
來完成一些運維操作,比如執行
compact
,執行
clean
,檢視送出記錄等等。在這裡我看到了讀取
hudi
增量資料、快照資料的契機,于是新增了一個
copy_to_table
指令,該指令可以把
hudi
表的資料以增量、快照、讀優化等讀取方式複制資料到
hive
表,于是于我們在
spark-sql
裡面讀取
hudi
表資料就變成了如下方式:
# read snapshot data from hudi table
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'snapshot',as_of_instant=>'20221018055647688')
select * from $viewName
# read incremental data from hudi table
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'incremental',begin_instance_time=>'20221018055647688')
select * from $viewName
# read read_optimized data from hudi table
call copy_to_table(table=>'$tableName',new_table=>'$viewName',query_type=>'read_optimized')
select * from $viewName
資料插入
如果能保證資料是新增的,那麼直接執行
insert
語句即可。但是
hudi
表中如果已經存在相同主鍵的資料,将會導緻資料重複,建議使用下面的
merge
語句插入。
插入資料到非分區表
insert into hudi_tbl select 1, 'a1', 20;
插入資料到動态分區
insert into hudi_tbl_part partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
插入資料到指定分區
insert into hudi_tbl_part partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
資料删除
hudi
允許删除使用者指定的資料,文法如下:
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
delete from hudi_mor_tbl where id % 2 = 0;
資料更新
update hudi_mor_tbl set price = price * 2, gmt_modified = 1111 where id = 1;
update hudi_cow_pt_tbl set gmt_modified = 1001 where name = 'a1';
hudi
允許使用任意條件來進行資料的更新,但是要求預合并的字段(本文為
gmt_modified
)一定要更新,否則将會報錯。
除了使用
update
語句外,
hudi
還支援更強大的
merge
語句。該語句可以根據不同的條件對目标表資料進行更新、删除、和新增。
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
merge into hudi_cow_pt_tbl as target
using (
select id, name, '1000' as ts, flag, dt, hh from merge_source2
) as source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
其中
target
作為目标表,
merge_source2
為增量資料,
join
的條件為
target.id = source.id
.當滿足條件時,判斷
flag
如果不等于
delete
,則進行資料的更新,
flag
如果等于
delete
則删除資料。如果
target.id = source.id
條件不滿足,即對于不存在目标表的增量資料直接新增即可
近實時排程
大資料離線排程平台基本都支援定時排程、依賴排程、自依賴排程等排程方式,除此之外在腳本開發時還支援一些内部時間表達式,比如
hera
排程系統的時間表達式為:
select * from xx_part where dt = ${zdt.addDay(-1).format("yyyyMMdd")}
select * from xx_part where dt = current_date();
表達式的
zdt
對象為目前任務的排程時間,該表達式的結果為排程時間減一天,并将日期格式化為
yyyyMMdd
格式,此表達式在數倉做
T-1
的離線計算時經常用到。而下面的
current_date
為任務運作時時間,在任務延遲執行的情況下會導緻業務計算出錯,誰用誰坑。
說到這裡了,在這裡就簡單解釋下排程時間,在排程系統中,一般分為業務時間、排程時間、觸發時間。
- 排程時間:為排程實際應該被觸發的時間,每天
點的任務,會生成一個 0
點的排程時間,該任務會在 0
點進入任務隊列,但是并不表示計算業務已經開始運作。即使任務的運作時間已經延遲,排程時間始終不變。0
- 業務時間:計算的業務資料所在的時間,通常由排程時間計算而來,比如
的計算,就是以排程時間減 T-1
天作為業務時間,計算業務時間指定的分區資料。1
- 觸發時間:任務實際開始運作的時間,大資料的離線計算對于排程時間的精确性的要求并沒有那麼高,也就是說即使我配置了一個
點計算的任務,但是可能因為資源等問題導緻任務阻塞 0
分鐘後才運作也沒有關系,而這裡的 10
分就是觸發時間。00:10
正是排程時間這種不變的特性,是以在排程系統中,我們通常隻使用排程時間進行業務時間的計算,而不是以觸發時間。
在此貼一下近實時排程的依賴關系:
我們在離線排程平台上要實作近實時數倉的計算,任務的開始節點為一個每
1-30
分鐘執行一次的定時任務,下遊任務根據業務場景以依賴排程進行計算,最主要的就是如何通過排程的方式做增量計算。
近實時數倉的所有表均為
hudi
表,所有的計算均為增量計算,以上圖的
dwd_1
任務舉例子,我們讀取上遊表的增量資料時可以通過以下表達式讀取
call copy_to_table(table=>'dim_1',new_table=>'dim_1_view',query_type=>'snapshot',as_of_instant=>'${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}')
call copy_to_table(table=>'ods_2',new_table=>'ods_2_view',query_type=>'incremental',begin_instance_time=>'${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}')
with source as (
select id,name,price,gmt_modified from ods_2_view
left join dim_1_view
on dim_1_view.id = ods_2_view.id
)
merge into dwd_1 as target
using source on target.id=source.id
when matched then update set *
when not matched then insert *
${zdt.addMinutes(-10).format("yyyyMMddHHmmssSSS")}
表達式将任務的排程時間減去
10
分鐘并格式化為
yyyyMMddHHmmssSSS
格式作為其業務時間,表示
M-10
的計算。
是以前兩行的含義為:分别複制
dim_1
十分鐘前的快照資料和
ods_2
最近
10
分鐘的增量資料到
dim_1_view
和
ods_2_view
。
然後注冊一個
source
視圖,該試圖由
ods_2_view
和
dim_1_view
通過
id
關聯得到,然後将
source
視圖的資料通過
merge into
語句寫入到
dwd_1
表。該文法通過
source.id = target.id
進行關聯,如果能夠關聯到,更新
dwd_1
表的該行資料。如果關聯不到直接插入
source
視圖的該行資料。
尾語
通過本篇文章叙述了使用
hudi
、
spark sql
實作近實時數倉的架構方式。還有很多不完美的地方,需要繼續完善,抛磚引玉,為大家提供一份可以參考的方案。
實際上
copy_to_table
需要将
hudi
表資料
copy
一份落盤,是比較耗時的。是以我又新增了一個
copy_to_temp_view
的指令,該指令會将
hudi
表注冊為
spark
的臨時視圖表,節省了落盤的時間,預計會在
0.13.0
版本能夠使用。
注:以上使用的
hudi
版本為
0.12.1
,
spark
版本為
3.2.2
本文引用的連結:
[1]: Apache Hudi 文檔 https://hudi.apache.org
[2]: hera 離線排程系統 https://github.com/scxwhite/hera
[3]: Apache Hudi vs Delta Lake vs Apache Iceberg - Lakehouse Feature Comparison https://www.onehouse.ai/blog/apache-hudi-vs-delta-lake-vs-apache-iceberg-lakehouse-feature-comparison
[4]: HUDI-3161 call produce command pr https://github.com/apache/hudi/pull/4535