天天看點

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

作者簡介:張寬天,畢業于華中科技大學,先後在華為、閱文集團從事資料開發方面工作,2019年加入流利說,擔任資料工程師,負責資料湖相關工作。

背景

流利說目前的離線計算任務中,大部分資料源都是來自于業務 DB,業務DB資料接入的準确性、穩定性和及時性,決定着下遊整個離線計算 pipeline 的準确性和及時性。同時,我們還有部分業務需求,需要對 DB 中的資料和 hive 中的資料做近實時的聯合查詢。

在引入阿裡雲 EMR Delta Lake 之前,我們通過封裝 DataX 來完成業務 DB 資料的接入,采用 Master-Slave 架構,Master 維護着每日要執行的 DataX 任務的中繼資料資訊,Worker 節點通過不斷的以搶占的方式擷取狀态為 init 和 restryable 的 DataX 任務來執行,直到當天的所有的 DataX 任務全都執行完畢為止。

架構圖大緻如下:

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

Worker 處理的過程如下:

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

對于近實時需求,我們是直接開一個從庫,配置 presto connector 去連接配接從庫,來實作業務 BD 中的資料和 hive 中的資料做近實時的聯合查詢需求。

這種架構方案的優點是簡單,易于實作。但是随着資料量也來越多,缺點也就逐漸暴露出來了:

性能瓶頸: 随着業務的增長,這種通過 SELECT 的方式接入資料的性能會越來越差,受 DB 性能瓶頸影響,無法通過增加 Worker 節點的方式來緩解。

規模大的表隻能通過從庫來拉取,造成資料接入的成本越來越高。

無法業務滿足近實時的查詢需求,近實時查詢隻能通過從庫的方式查詢,進一步加大了接入的成本。

為了解決這些問題,我們将目光聚焦到了 CDC實時接入的方案上。

技術方案選型

對于 CDC實時接入的方案,目前業内主要有以下幾種: CDC + Merge 方案、CDC + Hudi、CDC + Delta Lake 及 CDC + Iceberg 等幾種方案。其中,CDC + Merge 方案是在是在資料湖方案出現之前的做法,這種方案能節省DB從庫的成本,但是無法滿足業務近實時查詢的需求等功能,是以最開始就 pass 掉了,而 Iceberg 在我們選型之初,還不夠成熟,業界也沒有可參考的案列,是以也被 pass 掉了,最後我們是在 CDC + Hudi 和 CDC + Delta Lake 之間選擇。

在選型時,Hudi 和 Delta Lake 兩者的功能上都是大同小異的,是以我們主要是從這幾方案來考慮的: 穩定性、小檔案合并、是否支援SQL、雲廠商支援程度、語言支援程度等幾個方面來考慮。

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

基于以上名額,加上我們整個資料平台都是基于阿裡雲 EMR 搭建的,選擇 Delta Lake 的話,會省掉大量的适配開發工作,是以我們最終選擇了 CDC + Delta Lake 的方案。

整體架構

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

總體架構圖

整體的架構如上圖所示。我們接入的資料會分為兩部分,存量曆史資料和新資料,存量曆史資料使用 DataX 從 MySQL 中導出,存入 OSS 中,新資料使用 Binlog 采集存入 Delta Lake 表中。每日淩晨跑 ETL 任務前,先對曆史資料和新資料做 Merge 操作,ETL 任務使用 Merge 之後的資料。

Delta Lake 資料接入

在 Binlog 實時采集方面,我們采用了開源的 Debezium ,負責從 MySQL 實時拉取 Binlog 并完成适當解析,每張表對應一個 Topic ,分庫分表合并為一個 Topic 分發到 Kafka 上供下遊消費。Binlog 資料接入到 Kafka 之後,我們需要建立 Kafka Source 表指向對應的 Kafka Topic 中, 表的格式為:

CREATE TABLE

kafka_{db_name}_{table_name}

(

key

BINARY,

value

topic

STRING,

partition

INT,

offset

BIGINT,

timestamp

TIMESTAMP,

timestampType

INT)

USING kafka

OPTIONS (

kafka.sasl.mechanism

'PLAIN',

subscribe

'cdc-{db_name}-{table_name}',

serialization.format

'1',

kafka.sasl.jaas.config

'*****(redacted)',

kafka.bootstrap.servers

'{bootstrap-servers}',

kafka.security.protocol

'SASL_PLAINTEXT'

)

我們主要用到的字段是 value 和 offset ,其中 value 的格式如下:

{

"payload": {

"before": {

db記錄變更前的schema及内容,op=c時,為null

},

"after": {

db記錄變更後的schema及内容,op=d時,為null

"source": {

ebezium配置資訊

"op": "c",

"ts_ms":

}

同時建立 Delta Lake 表,Location 指向 HDFS 或者 OSS ,表結構為:

CREATE TABLE IF NOT EXISTS delta.delta_{dbname}{table_name}(

{row_key_info},

ts_ms bigint,

json_record string,

operation_type string,

offset bigint

USING delta

LOCATION '------/delta/{db_name}.db/{table_name}'

其中 row_key_info 為 Delta Lake 表的唯一索引字段,對于單庫單表而言,row_key_info 為 mysql 表的 primary key 字段 eg: id long,對于分庫分表及分執行個體分庫分表而言,row_key_info 為分庫分表的字段和單表裡primary key 字段組成,eg: 以 user_id 為分表字段,每張表裡以 id 為 primary key , 那麼對應的 row_key_info 為 id long, user_id long。

StreamingSQL 處理 Kafka 中的資料,我們主要是提取 Kafka Source 表中的 offset、value 字段及 value 字段中的 CDC 資訊如: op、ts_ms 及 payload 的 after 和 before 字段。StreamingSQL 中,我們采用 5min 一個 mini batch,主要是考慮到 mini batch 太小會産生很多小檔案,處理速度會越來越慢,也會影響讀的性能,太大了又沒法滿足近實時查詢的要求。而 Delta Lake 表,我們不将 after 或者 before 字段解析出來,主要是考慮到我們業務表 的 schema 經常變更,業務表 schema 一變更就要去修複一遍資料,成本比較大。在 StreamingSQL 處理過程中,對于 op=’c’ 的資料我們會直接 insert 操作,json_record 取 after 字段。對于 op=’u’ 或者 op=’d’ 的資料,如果 Delta Lake 表中不存在,那麼執行 insert 操作, 如果存在,那麼執行 update 操作;json_record 的指派值,op=’d’,json_record 取 before 字段,op=’u’,jsonrecord 取 after 字段。保留 op=’d’ 的字段,主要是考慮到删除的資料可能在存量曆史表中,如果直接删除的話,淩晨 merge 的資料中,存在存量曆史表中的資料就不會被删除。

整個 StreamingSQL 的處理大緻如下:

CREATE SCAN incremental{dbname}{tablename} on kafka{dbname}{table_name} USING STREAM

OPTIONS(

startingOffsets='earliest',

maxOffsetsPerTrigger='1000000',

failOnDataLoss=false

);

CREATE STREAM job

checkpointLocation='------/delta/{db_name}.db/{table_name}checkpoint',

triggerIntervalMs='300000'

MERGE INTO delta.delta{dbname}{table_name} as target

USING (

SELECT * FROM (

SELECT ts_ms, offset, operation_type, {key_column_sql}, coalesce(after_record, before_record) as after_record, row_number() OVER (PARTITION BY {key_column_partition_sql} ORDER BY ts_ms DESC, offset DESC) as rank

FROM (

SELECT ts_ms, offset, operation_type, before_record, after_record, {key_column_include_sql}

FROM ( SELECT get_json_object(string(value), '$.payload.op') as operation_type,

get_json_object(string(value), '$.payload.before') as before_record,

get_json_object(string(value), '$.payload.after') as after_record,

get_json_object(string(value), '$.payload.ts_ms') as tsms,

offset

FROM incremental{dbname}{table_name}

) binlog

) binlog_wo_init ) binlog_rank where rank = 1) as source

ON {key_column_condition_sql}

WHEN MATCHED AND (source.operation_type = 'u' or source.operation_type='d') THEN

UPDATE SET {set_key_column_sql}, ts_ms=source.ts_ms, json_record=source.after_record, operation_type=source.operation_type, offset=source.offset

WHEN NOT MATCHED AND (source.operation_type='c' or source.operation_type='u' or source.operation_type='d') THEN

INSERT ({inser_key_column_sql}, ts_ms, json_record, operation_type, offset) values ({insert_key_column_value_sql}, source.ts_ms, source.after_record, source.operation_type, source.offset);

執行完 StreamingSQL 之後,就會生成如下格式的資料:

其中 part-xxxx.snappy.parquet 儲存的是 DeltaLake 表的資料檔案,而 _deltalog 目錄下儲存的是 DeltaLake 表的中繼資料,包括如下:

其中 xxxxxxxx 表示的是版本資訊,xxxxxxxx.json 檔案裡儲存的是有效的 parquet 檔案資訊,其中 add 類型的為有效的 parquet 檔案, remove 為無效的 parquet 檔案。

Delta Lake 是支援 Time travel 的,但是我們 CDC 資料接入的話,用不到資料復原政策,如果多版本的資料一直保留會給我們的存儲帶來一定的影響,是以我們要定期删除過期版本的資料,目前是僅保留2個小時内的版本資料。同時,Delta Lake 不支援自動合并小檔案的功能,是以我們還需要定期合并小檔案。目前我們的做法是,每小時通過 OPTIMIZE 和 VACCUM 來做一次合并小檔案操作及清理過期資料檔案操作:

optimize delta{dbname}{tablename};

set spark.databricks.delta.retentionDurationCheck.enabled = false;

VACUUM delta{dbname}{table_name} RETAIN 1 HOURS;

由于目前 Hive 和 Presto 無法直接讀取 Spark SQL 建立的 Delta Lake 表,但是監控及近實時查詢需求,需要查詢 Delta Lake 表,是以我們還建立了用于 Hive 和 Presto 表查詢的。

Delta Lake 資料與存量資料 Merge

由于 Delta Lake 的資料我們僅接入新資料,對于存量曆史資料我們是通過DataX 一次性導入的,加上 Delta Lake 表 Hive 無法直接查詢,是以每日淩晨我們需要對這兩部分資料做一次 merge 操作,寫入到新的表中便于 Spark SQL 和 Hive 統一使用。這一子產品的架構大緻如下:

圖檔

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

每日淩晨0點前,調用 DeltaService API ,根據 Delta Lake 任務的配置自動生成 merge任務 的 task 資訊、spark-sql 腳本及 對應的 Airflow DAG 檔案。

merge 任務的 task 資訊主要包括如下資訊:

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

自動生成 Merge 腳本,主要是從 Delta Lake 任務的配置中擷取 mysql 表的schema 資訊,删掉曆史的 Hive 表,再根據 schema 資訊重新建立 Hive 外部表,再根據新的 schema 從Delta Lake表的 json_record 字段和曆史存量資料表中擷取對應的字段值做 union all 操作,缺失值采用mysql 的預設值, union 之後,再根據 row_key 進行分組,按 ts_ms 排序取第一條,同時取出operation_type=’d’ 的資料。整體如下:

CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '------/delta/{db_name}.db';

DROP TABLE IF EXISTS

{db_name}

.

{table_name}

;

CREATE TABLE IF NOT EXISTS

{db_name}

{table_name}

{table_column_infos}

STORED AS PARQUET

LOCATION '------/delta/{db_name}.db/{table_name}/data_date=${{data_date}}';

INSERT OVERWRITE TABLE

{db_name}

{table_name}

SELECT {table_columns}

FROM ( SELECT {table_columns}, _operation_type, row_number() OVER (PARTITION BY {row_keys} ORDER BY ts_ms DESC) as ranknum

SELECT {delta_columns}, operation_type as _operation_type, tsms

FROM delta{dbname}{table_name}

UNION ALL

SELECT {hive_columns}, 'c' as _operation_type, 0 as ts_ms

FROM

{db_name}

{table_name}_delta_history

) union_rank

) ranked_data

WHERE ranknum=1

AND _operation_type <> 'd'

淩晨0點之後,Airflow 會根據 Airflow DAG 檔案自動排程執行 merge 的Spark SQL 腳本,腳本執行成功後,更新 merge task 的狀态為 succeed ,Airflow 的 ETL DAG 會根據merge task 的狀态自動排程下遊的 ETL 任務。

Delta Lake 資料監控

對于 Delta Lake 資料的監控,我們主要是為了兩個目的:監控資料是否延遲及監控資料是否丢失,主要是在 MySQL 與 Delta Lake 表之間及 CDC 接入過來的 Kafka Topic 與 Delta Lake 表之間。

CDC 接入過來的 Kafka Topic 和 Delta Lake 表之間的延遲監控:我們是每15分鐘從 Kafka 的 Topic 中擷取每個 Partition 的最大 offset 對應的 mysql 的 row_key 字段内容,放入監控的 MySQL 表 delta_kafka_monitor_info 中,再從 delta_kafka_monitor_info 中擷取上一周期的 row_key 字段内容,到 Delta Lake 表中查詢,如果查詢不到,說明資料有延遲或者丢失,發出告警。

MySQL 與 Delta Lake 之間的監控:我們有兩種,一種是探針方案,每15分鐘,從 MySQL 中擷取最大的 id,對于分庫分表,隻監控一張表的,存入 delta_mysql_monitor_info 中,再從 delta_mysql_monitor_info 中擷取上一周期的最大 id,到 Delta Lake 表中查詢,如果查詢不到,說明資料有延遲或者丢失,發出告警。另一種是直接 count(id),這種方案又分為單庫單表和分庫分表兩種,中繼資料儲存在 mysql 表 id_based_mysql_delta_monitor_info 中,主要包含 min_id、max_id、mysql_count 三個字段,對于單庫單表,也是每隔5分鐘,從 Delta Lake 表中擷取 min_id 和 max_id 之間的 count 值,跟 mysql_count 對比,如果小于 mysql_count 值說明有資料丢失或者延遲,發出告警。再從 mysql 中擷取 max(id) 和 max_id 與 max(id) 之間的 count 值,更新到 id_based_mysql_delta_monitor_info 表中。對于分庫分表的情況,根據分庫分表規則,生成每一張表對應的 id_based_mysql_delta_monitor_info 資訊,每半小時執行一遍監控,規則同單庫單表。

遇到的挑戰

業務表 schema 變更頻繁,Delta Lake 表如果直接解析 CDC 的字段資訊的話,如果不能及時發現并修複資料的話,後期修複資料的成本會較大,目前我們是不解析字段,等到淩晨 merge 的時候再解析。

随着資料量越來越大,StreamingSQL 任務的性能會越來越差。我們目前是 StreamingSQL 處理延遲,出現大量延遲告警後,将 Delta Lake 存量資料替換成昨日 merge 後的資料,再删掉 Delta Lake 表,删除 checkpoint 資料,從頭開始消費 KafkaSource 表的資料。降低 Delta Lake 表資料,進而緩解StreamingSQL 的壓力。

Hive 和 Presto 不能直接查詢 Spark SQL 建立的 Delta Lake 表,目前我們是建立支援 Hive 和 Presto 查詢的外部表來供 Hive 和 Presto 使用,但是這些表又無法通過 Spark SQL 查詢。是以上層 ETL 應用無法在不更改代碼的情況下,在 Hive 和 Spark SQL 及Presto 引擎之間自由切換。

帶來的收益

節省了 DB 從庫的成本,采用 CDC + Delta Lake 之後,我們的成本節省了近80%。

淩晨 DB 資料接入的時間成本大大降低,能夠確定所有非特殊要求的 DB 資料接入都能在1個小時内跑完。

後續規劃

StreamingSQL 任務随着 Delta Lake 表資料量越來越大,性能越來越差問題跟進。

推動能否解決 Spark SQL 建立的 Delta Lake 表,無法直接使用 Hive 和 Presto 查詢的問題。

歡迎對阿裡雲 EMR Delta Lake感興趣的朋友加入阿裡雲EMR釘釘群交流測試,群内會定期進行精品内容分享,測試請@揚流,釘釘群如下

阿裡雲 EMR Delta Lake 在流利說資料接入中的架構和實踐背景技術方案選型遇到的挑戰帶來的收益後續規劃

繼續閱讀