天天看點

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

什麼是CDC

Change Data Capture(CDC)用來跟蹤捕獲資料源的資料變化,并将這些變化同步到目标存儲(如資料湖或資料倉庫),用于資料備份或後續分析,同步過程可以是分鐘/小時/天等粒度,也可以是實時同步。CDC方案分為侵入式(intrusive manner)和非傾入性(non-intrusive manner)兩種。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

侵入式

侵入式方案直接請求資料源系統(如通過JDBC讀取資料),會給資料源系統帶來性能壓力。常見的方案如下:

  • 最後更新時間(Last Modified)

源表需要有修改時間列,同步作業需要指定最後修改時間參數,表明同步某個時間點之後變更的資料。該方法不能同步删除記錄的變更,同一條記錄多次變更隻能記錄最後一次。

  • 自增id列

源表需要有一個自增id列,同步作業需要指定上次同步的最大id值,同步上次之後新增的記錄行。該方法也不能同步删除記錄的變更,而且老記錄的變更也無法感覺。

非侵入式

非侵入性一般通過日志的方式記錄資料源的資料變化(如資料庫的binlog),源庫需要開啟binlog的功能。資料源的每次操作都會被記錄到binlog中(如insert/update/delete等),能夠實時跟蹤資料插入/删除/資料多次更新/DDL操作等。

示例:

insert into table testdb.test values("hangzhou",1);
update testdb.test set b=2 where a="hangzhou";
update testdb.test set b=3 where a="hangzhou";
delete from testdb.test where a="hangzhou";           
資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

通過将binlog日志有序的回放到目标存儲中,進而實作對資料源的資料導出同步功能。

常見的CDC方案實作

開源常見的CDC方案實作主要有兩種:

Sqoop離線同步

sqoop

是一個開源的資料同步工具,它可以将資料庫的資料同步到HDFS/Hive中,支援全量同步和增量同步,使用者可以配置小時/天的排程作業來定時同步資料。

sqoop增量同步是一種侵入式的CDC方案,支援Last Modified和Append模式。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

缺點:

  • 直接jdbc請求源庫拉取資料,影響源庫性能
  • 小時/天排程,實時性不高
  • 無法同步源庫的删除操作,Append模式還不支援資料更新操作

binlog實時同步

binlog日志可以通過一些工具實時同步到kafka等消息中間件中,然後通過Spark/Flink等流引擎實時的回放binlog到目标存儲(如Kudu/HBase等)。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖
  • Kudu/HBase運維成本高
  • Kudu在資料量大的有穩定性問題, HBase不支援高吞吐的分析
  • Spark Streaming實作回放binlog邏輯複雜,使用java/scala代碼具有一定門檻

Streaming SQL+Delta Lake實時入湖方案

前面介紹了兩種常見的CDC方案,各自都有一些缺點。

阿裡雲E-MapReduce

團隊提供了一種新的CDC解決方案,利用

自研的Streaming SQL

搭配

Delta Lake

可以輕松實作CDC實時入湖。這套解決方案同時通過阿裡雲最新釋出的資料湖建構(Data Lake Formation,DLF)服務提供一站式的入湖體驗。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

Streaming SQL

Spark Streaming SQ

L在Spark Structured Streaming之上提供了SQL能力,降低了實時業務開發的門檻,使得離線業務實時化更簡單友善。

Spark Streaming SQL支援的文法如下:

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

下面以實時消費SLS為例:

# 建立loghub源表
spark-sql> CREATE TABLE loghub_intput_tbl(content string)
         > USING loghub
         > OPTIONS
         > (...) 
# 建立delta目标表
spark-sql> CREATE TABLE delta_output_tbl(content string)
         > USING delta
         > OPTIONS
         > (...);
# 建立流式SCAN
spark-sql> CREATE SCAN loghub_table_intput_test_stream
         > ON loghub_intput_tbl
         > USING STREAM;
# 将loghub源表資料插入delta目标表         
spark-sql> INSERT INTO delta_output_tbl SELECT content FROM loghub_table_intput_test_stream;           

是Databricks開源的一種資料湖格式,它在parquet格式之上,提供了ACID事務/中繼資料管理等能力,同時相比parquet具有更好的性能,能夠支援更豐富的資料應用場景(如資料更新/schema演化等)。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖
E-MapReduce

團隊在開源Delta Lake基礎上做了很多功能和性能的優化,如小檔案合并Optimize/DataSkipping/Zorder,SparkSQL/Streaming SQL/Hive/Presto深度內建Delta等。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

Streaming SQL+Delta Lake CDC實時入湖

Spark Streaming SQL提供了Merge Into 的文法,搭配Delta Lake的實時寫入能力,可以很友善的實作CDC實時入湖方案。

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖

如上圖所示,隻需要SQL就能完成CDC實時入湖,細節步驟詳見

E-MapReduce文檔

阿裡雲最新釋出的資料湖建構(Data Lake Formation,DLF)服務,提供了完整的一站式入湖解決方案。

----

更多資料湖技術相關的文章請點選:

阿裡雲重磅釋出雲原生資料湖體系

更多資料湖相關資訊交流請加入阿裡巴巴資料湖技術釘釘群

資料湖建構服務搭配Delta Lake玩轉CDC實時入湖