作者簡介:張寬天,畢業于華中科技大學,先後在華為、閱文集團從事資料開發方面工作,2019年加入流利說,擔任資料工程師,負責資料湖相關工作。
背景
流利說目前的離線計算任務中,大部分資料源都是來自于業務 DB,業務DB資料接入的準确性、穩定性和及時性,決定着下遊整個離線計算 pipeline 的準确性和及時性。同時,我們還有部分業務需求,需要對 DB 中的資料和 hive 中的資料做近實時的聯合查詢。
在引入阿裡雲 EMR Delta Lake 之前,我們通過封裝 DataX 來完成業務 DB 資料的接入,采用 Master-Slave 架構,Master 維護着每日要執行的 DataX 任務的中繼資料資訊,Worker 節點通過不斷的以搶占的方式擷取狀态為 init 和 restryable 的 DataX 任務來執行,直到當天的所有的 DataX 任務全都執行完畢為止。
架構圖大緻如下:

Worker 處理的過程如下:
對于近實時需求,我們是直接開一個從庫,配置 presto connector 去連接配接從庫,來實作業務 BD 中的資料和 hive 中的資料做近實時的聯合查詢需求。
這種架構方案的優點是簡單,易于實作。但是随着資料量也來越多,缺點也就逐漸暴露出來了:
性能瓶頸: 随着業務的增長,這種通過 SELECT 的方式接入資料的性能會越來越差,受 DB 性能瓶頸影響,無法通過增加 Worker 節點的方式來緩解。
規模大的表隻能通過從庫來拉取,造成資料接入的成本越來越高。
無法業務滿足近實時的查詢需求,近實時查詢隻能通過從庫的方式查詢,進一步加大了接入的成本。
為了解決這些問題,我們将目光聚焦到了 CDC實時接入的方案上。
技術方案選型
對于 CDC實時接入的方案,目前業内主要有以下幾種: CDC + Merge 方案、CDC + Hudi、CDC + Delta Lake 及 CDC + Iceberg 等幾種方案。其中,CDC + Merge 方案是在是在資料湖方案出現之前的做法,這種方案能節省DB從庫的成本,但是無法滿足業務近實時查詢的需求等功能,是以最開始就 pass 掉了,而 Iceberg 在我們選型之初,還不夠成熟,業界也沒有可參考的案列,是以也被 pass 掉了,最後我們是在 CDC + Hudi 和 CDC + Delta Lake 之間選擇。
在選型時,Hudi 和 Delta Lake 兩者的功能上都是大同小異的,是以我們主要是從這幾方案來考慮的: 穩定性、小檔案合并、是否支援SQL、雲廠商支援程度、語言支援程度等幾個方面來考慮。
基于以上名額,加上我們整個資料平台都是基于阿裡雲 EMR 搭建的,選擇 Delta Lake 的話,會省掉大量的适配開發工作,是以我們最終選擇了 CDC + Delta Lake 的方案。
整體架構
總體架構圖
整體的架構如上圖所示。我們接入的資料會分為兩部分,存量曆史資料和新資料,存量曆史資料使用 DataX 從 MySQL 中導出,存入 OSS 中,新資料使用 Binlog 采集存入 Delta Lake 表中。每日淩晨跑 ETL 任務前,先對曆史資料和新資料做 Merge 操作,ETL 任務使用 Merge 之後的資料。
Delta Lake 資料接入
在 Binlog 實時采集方面,我們采用了開源的 Debezium ,負責從 MySQL 實時拉取 Binlog 并完成适當解析,每張表對應一個 Topic ,分庫分表合并為一個 Topic 分發到 Kafka 上供下遊消費。Binlog 資料接入到 Kafka 之後,我們需要建立 Kafka Source 表指向對應的 Kafka Topic 中, 表的格式為:
CREATE TABLE
kafka_{db_name}_{table_name}
(
key
BINARY,
value
topic
STRING,
partition
INT,
offset
BIGINT,
timestamp
TIMESTAMP,
timestampType
INT)
USING kafka
OPTIONS (
kafka.sasl.mechanism
'PLAIN',
subscribe
'cdc-{db_name}-{table_name}',
serialization.format
'1',
kafka.sasl.jaas.config
'*****(redacted)',
kafka.bootstrap.servers
'{bootstrap-servers}',
kafka.security.protocol
'SASL_PLAINTEXT'
)
我們主要用到的字段是 value 和 offset ,其中 value 的格式如下:
{
"payload": {
"before": {
db記錄變更前的schema及内容,op=c時,為null
},
"after": {
db記錄變更後的schema及内容,op=d時,為null
"source": {
ebezium配置資訊
"op": "c",
"ts_ms":
}
同時建立 Delta Lake 表,Location 指向 HDFS 或者 OSS ,表結構為:
CREATE TABLE IF NOT EXISTS delta.delta_{dbname}{table_name}(
{row_key_info},
ts_ms bigint,
json_record string,
operation_type string,
offset bigint
USING delta
LOCATION '------/delta/{db_name}.db/{table_name}'
其中 row_key_info 為 Delta Lake 表的唯一索引字段,對于單庫單表而言,row_key_info 為 mysql 表的 primary key 字段 eg: id long,對于分庫分表及分執行個體分庫分表而言,row_key_info 為分庫分表的字段和單表裡primary key 字段組成,eg: 以 user_id 為分表字段,每張表裡以 id 為 primary key , 那麼對應的 row_key_info 為 id long, user_id long。
StreamingSQL 處理 Kafka 中的資料,我們主要是提取 Kafka Source 表中的 offset、value 字段及 value 字段中的 CDC 資訊如: op、ts_ms 及 payload 的 after 和 before 字段。StreamingSQL 中,我們采用 5min 一個 mini batch,主要是考慮到 mini batch 太小會産生很多小檔案,處理速度會越來越慢,也會影響讀的性能,太大了又沒法滿足近實時查詢的要求。而 Delta Lake 表,我們不将 after 或者 before 字段解析出來,主要是考慮到我們業務表 的 schema 經常變更,業務表 schema 一變更就要去修複一遍資料,成本比較大。在 StreamingSQL 處理過程中,對于 op=’c’ 的資料我們會直接 insert 操作,json_record 取 after 字段。對于 op=’u’ 或者 op=’d’ 的資料,如果 Delta Lake 表中不存在,那麼執行 insert 操作, 如果存在,那麼執行 update 操作;json_record 的指派值,op=’d’,json_record 取 before 字段,op=’u’,jsonrecord 取 after 字段。保留 op=’d’ 的字段,主要是考慮到删除的資料可能在存量曆史表中,如果直接删除的話,淩晨 merge 的資料中,存在存量曆史表中的資料就不會被删除。
整個 StreamingSQL 的處理大緻如下:
CREATE SCAN incremental{dbname}{tablename} on kafka{dbname}{table_name} USING STREAM
OPTIONS(
startingOffsets='earliest',
maxOffsetsPerTrigger='1000000',
failOnDataLoss=false
);
CREATE STREAM job
checkpointLocation='------/delta/{db_name}.db/{table_name}checkpoint',
triggerIntervalMs='300000'
MERGE INTO delta.delta{dbname}{table_name} as target
USING (
SELECT * FROM (
SELECT ts_ms, offset, operation_type, {key_column_sql}, coalesce(after_record, before_record) as after_record, row_number() OVER (PARTITION BY {key_column_partition_sql} ORDER BY ts_ms DESC, offset DESC) as rank
FROM (
SELECT ts_ms, offset, operation_type, before_record, after_record, {key_column_include_sql}
FROM ( SELECT get_json_object(string(value), '$.payload.op') as operation_type,
get_json_object(string(value), '$.payload.before') as before_record,
get_json_object(string(value), '$.payload.after') as after_record,
get_json_object(string(value), '$.payload.ts_ms') as tsms,
offset
FROM incremental{dbname}{table_name}
) binlog
) binlog_wo_init ) binlog_rank where rank = 1) as source
ON {key_column_condition_sql}
WHEN MATCHED AND (source.operation_type = 'u' or source.operation_type='d') THEN
UPDATE SET {set_key_column_sql}, ts_ms=source.ts_ms, json_record=source.after_record, operation_type=source.operation_type, offset=source.offset
WHEN NOT MATCHED AND (source.operation_type='c' or source.operation_type='u' or source.operation_type='d') THEN
INSERT ({inser_key_column_sql}, ts_ms, json_record, operation_type, offset) values ({insert_key_column_value_sql}, source.ts_ms, source.after_record, source.operation_type, source.offset);
執行完 StreamingSQL 之後,就會生成如下格式的資料:
其中 part-xxxx.snappy.parquet 儲存的是 DeltaLake 表的資料檔案,而 _deltalog 目錄下儲存的是 DeltaLake 表的中繼資料,包括如下:
其中 xxxxxxxx 表示的是版本資訊,xxxxxxxx.json 檔案裡儲存的是有效的 parquet 檔案資訊,其中 add 類型的為有效的 parquet 檔案, remove 為無效的 parquet 檔案。
Delta Lake 是支援 Time travel 的,但是我們 CDC 資料接入的話,用不到資料復原政策,如果多版本的資料一直保留會給我們的存儲帶來一定的影響,是以我們要定期删除過期版本的資料,目前是僅保留2個小時内的版本資料。同時,Delta Lake 不支援自動合并小檔案的功能,是以我們還需要定期合并小檔案。目前我們的做法是,每小時通過 OPTIMIZE 和 VACCUM 來做一次合并小檔案操作及清理過期資料檔案操作:
optimize delta{dbname}{tablename};
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta{dbname}{table_name} RETAIN 1 HOURS;
由于目前 Hive 和 Presto 無法直接讀取 Spark SQL 建立的 Delta Lake 表,但是監控及近實時查詢需求,需要查詢 Delta Lake 表,是以我們還建立了用于 Hive 和 Presto 表查詢的。
Delta Lake 資料與存量資料 Merge
由于 Delta Lake 的資料我們僅接入新資料,對于存量曆史資料我們是通過DataX 一次性導入的,加上 Delta Lake 表 Hive 無法直接查詢,是以每日淩晨我們需要對這兩部分資料做一次 merge 操作,寫入到新的表中便于 Spark SQL 和 Hive 統一使用。這一子產品的架構大緻如下:
圖檔
每日淩晨0點前,調用 DeltaService API ,根據 Delta Lake 任務的配置自動生成 merge任務 的 task 資訊、spark-sql 腳本及 對應的 Airflow DAG 檔案。
merge 任務的 task 資訊主要包括如下資訊:
自動生成 Merge 腳本,主要是從 Delta Lake 任務的配置中擷取 mysql 表的schema 資訊,删掉曆史的 Hive 表,再根據 schema 資訊重新建立 Hive 外部表,再根據新的 schema 從Delta Lake表的 json_record 字段和曆史存量資料表中擷取對應的字段值做 union all 操作,缺失值采用mysql 的預設值, union 之後,再根據 row_key 進行分組,按 ts_ms 排序取第一條,同時取出operation_type=’d’ 的資料。整體如下:
CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '------/delta/{db_name}.db';
DROP TABLE IF EXISTS
{db_name}
.
{table_name}
;
CREATE TABLE IF NOT EXISTS
{db_name}
{table_name}
{table_column_infos}
STORED AS PARQUET
LOCATION '------/delta/{db_name}.db/{table_name}/data_date=${{data_date}}';
INSERT OVERWRITE TABLE
{db_name}
{table_name}
SELECT {table_columns}
FROM ( SELECT {table_columns}, _operation_type, row_number() OVER (PARTITION BY {row_keys} ORDER BY ts_ms DESC) as ranknum
SELECT {delta_columns}, operation_type as _operation_type, tsms
FROM delta{dbname}{table_name}
UNION ALL
SELECT {hive_columns}, 'c' as _operation_type, 0 as ts_ms
FROM
{db_name}
{table_name}_delta_history
) union_rank
) ranked_data
WHERE ranknum=1
AND _operation_type <> 'd'
淩晨0點之後,Airflow 會根據 Airflow DAG 檔案自動排程執行 merge 的Spark SQL 腳本,腳本執行成功後,更新 merge task 的狀态為 succeed ,Airflow 的 ETL DAG 會根據merge task 的狀态自動排程下遊的 ETL 任務。
Delta Lake 資料監控
對于 Delta Lake 資料的監控,我們主要是為了兩個目的:監控資料是否延遲及監控資料是否丢失,主要是在 MySQL 與 Delta Lake 表之間及 CDC 接入過來的 Kafka Topic 與 Delta Lake 表之間。
CDC 接入過來的 Kafka Topic 和 Delta Lake 表之間的延遲監控:我們是每15分鐘從 Kafka 的 Topic 中擷取每個 Partition 的最大 offset 對應的 mysql 的 row_key 字段内容,放入監控的 MySQL 表 delta_kafka_monitor_info 中,再從 delta_kafka_monitor_info 中擷取上一周期的 row_key 字段内容,到 Delta Lake 表中查詢,如果查詢不到,說明資料有延遲或者丢失,發出告警。
MySQL 與 Delta Lake 之間的監控:我們有兩種,一種是探針方案,每15分鐘,從 MySQL 中擷取最大的 id,對于分庫分表,隻監控一張表的,存入 delta_mysql_monitor_info 中,再從 delta_mysql_monitor_info 中擷取上一周期的最大 id,到 Delta Lake 表中查詢,如果查詢不到,說明資料有延遲或者丢失,發出告警。另一種是直接 count(id),這種方案又分為單庫單表和分庫分表兩種,中繼資料儲存在 mysql 表 id_based_mysql_delta_monitor_info 中,主要包含 min_id、max_id、mysql_count 三個字段,對于單庫單表,也是每隔5分鐘,從 Delta Lake 表中擷取 min_id 和 max_id 之間的 count 值,跟 mysql_count 對比,如果小于 mysql_count 值說明有資料丢失或者延遲,發出告警。再從 mysql 中擷取 max(id) 和 max_id 與 max(id) 之間的 count 值,更新到 id_based_mysql_delta_monitor_info 表中。對于分庫分表的情況,根據分庫分表規則,生成每一張表對應的 id_based_mysql_delta_monitor_info 資訊,每半小時執行一遍監控,規則同單庫單表。
遇到的挑戰
業務表 schema 變更頻繁,Delta Lake 表如果直接解析 CDC 的字段資訊的話,如果不能及時發現并修複資料的話,後期修複資料的成本會較大,目前我們是不解析字段,等到淩晨 merge 的時候再解析。
随着資料量越來越大,StreamingSQL 任務的性能會越來越差。我們目前是 StreamingSQL 處理延遲,出現大量延遲告警後,将 Delta Lake 存量資料替換成昨日 merge 後的資料,再删掉 Delta Lake 表,删除 checkpoint 資料,從頭開始消費 KafkaSource 表的資料。降低 Delta Lake 表資料,進而緩解StreamingSQL 的壓力。
Hive 和 Presto 不能直接查詢 Spark SQL 建立的 Delta Lake 表,目前我們是建立支援 Hive 和 Presto 查詢的外部表來供 Hive 和 Presto 使用,但是這些表又無法通過 Spark SQL 查詢。是以上層 ETL 應用無法在不更改代碼的情況下,在 Hive 和 Spark SQL 及Presto 引擎之間自由切換。
帶來的收益
節省了 DB 從庫的成本,采用 CDC + Delta Lake 之後,我們的成本節省了近80%。
淩晨 DB 資料接入的時間成本大大降低,能夠確定所有非特殊要求的 DB 資料接入都能在1個小時内跑完。
後續規劃
StreamingSQL 任務随着 Delta Lake 表資料量越來越大,性能越來越差問題跟進。
推動能否解決 Spark SQL 建立的 Delta Lake 表,無法直接使用 Hive 和 Presto 查詢的問題。
歡迎對阿裡雲 EMR Delta Lake感興趣的朋友加入阿裡雲EMR釘釘群交流測試,群内會定期進行精品内容分享,測試請@揚流,釘釘群如下