天天看點

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

嘉賓簡介:辛現銀,花名辛庸,阿裡巴巴計算平台事業部 EMR 技術專家,Apache Hadoop,Apache Spark contributor,對 Hadoop、Spark、Hive、Druid 等大資料元件有深入研究。目前從事大資料雲化相關工作,專注于計算引擎、存儲結構、資料庫事務等内容,今天為大家介紹Delta Lake 如何幫助雲使用者解決資料實時入庫的問題。

直播回放:

https://developer.aliyun.com/live/2894 以下是視訊内容精華整理。

一、CDC簡介

CDC是Change Data Capture的縮寫,也就是改變資料捕獲。比如在最開始的時候我們用工具将業務資料導入資料倉庫、資料湖當中,之後導入資料的時候我們希望反映資料的動态變化,進行增量導入,并且能夠盡快的捕獲這些變化資料,以便更快地進行後續的分析,而CDC技術能夠幫助我們捕獲這些變化的資料。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

大資料場景下我們常用的工具是Sqoop,它是一個批處理模式的工具,我們可以用它把業務庫中的資料導入到資料倉庫。需要注意的時候我們在導入之前要在業務庫中的資料中選出能反映時間變化的字段,然後依據時間戳将發生變化的資料導入資料倉庫中,這是使用它的一個限制。另外,這個工具還有如下幾個缺點:

  • 對源庫産生壓力;
  • 延遲大,依賴于調用它的頻次;
  • 無法處理delete事件,源庫中被delete的資料無法同步在數倉中被delete;
  • 無法應對schema變動,一旦源庫中的scheme發生變化,就在對數倉中的表模型重建立模和導入。

除了使用 sqoop,還有一種方式是使用binlog 的方式進行資料同步。源庫在進行插入、更新、删除等操作的時候會産生binlog,我們隻需要将binlog打入KafKa,從 Kafka 中讀取 binlog,逐條解析後執行對應的操作即可。但是這種方式要求下遊能夠支援比較頻繁的update/delete操作,以應對上遊頻繁的 update/delete 情形。這裡可以選擇KUDU或者HBASE 作為目标存儲。但是,由于KUDU和HBASE不是數倉,無法存儲全量的資料,是以需要定期把其中的資料導入到Hive中,如下圖所示。需要注意的是,這種方式存在多個元件運維壓力大、Merge邏輯複雜等缺點。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

二、基于Spark Streaming SQL & Delta 的CDC方案

(一)Spark Streaming SQL

Spark Streaming SQL是阿裡巴巴計算平台事業部EMR團隊基于Spark Streaming開發的SQL支援,社群版本是沒有的。Spark Streaming SQL在這套CDC方案中不是必須的,但是它對于使用者更加的友好,尤其是對習慣于使用SQL的使用者來說,是以 EMR 團隊開發了 Spark Streaming SQL 的支援。如下圖所示,EMR 的 Spark Streaming SQL在諸多方面實作了對SQL文法的支援,比如DDL、DML、SELECT等等,下面撿幾個分别予以介紹。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

(1)CREATE SCAN & CREATE STREAM

下面所示的是一個例子,我們的目标是從KafKa中的一張表中select一些資料,設計目标是盡可能的支援批和流兩種方式。在普通的SQL中,實際上select就産生了讀操作,但是這裡為了區分batch和Streaming,我們需要顯式的create scan,因為我們無法從data source上區分是batch讀還是Streaming讀,如果是batch,我們就使用 USING batch,如果是Streaming,我們就使用USING stream。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

對于 batch scan,在create scan之後就可以直接從scan中select,把scan當作一張表;然而對于Streaming,如果要讀這個scan,就需要設計很多參數,因為要發起一個job,于是有了如下圖所示的create stream文法,其本質是對select文法的封裝。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

(2)MERGE INTO

另外一個比較核心的文法是MERGE INTO,其在Delta Lake的CDC方案中有着非常重要的地位。MERGE INTO的文法是比較複雜的,具體如下圖所示。需要注意的是MERGE INTO中的mergeCondition必須在源表和target表中産生一一對應的關系,不然如果一條 source record 對應多條 target records,系統就不知道應該對哪條進行操作了。是以這裡實際上要求 mergeCondition 是一個主鍵連接配接,或者等同于主鍵連接配接的效果。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

除了上面介紹的幾個文法,為了大家更加友善地使用Spark Streaming SQL,我們還實作了一些其他的UDF,比如DELAY、TUMBLING等。

(二)Delta Lake

資料湖是近些年比較火熱的一個技術。早先大家用的都是一些比較成熟的資料倉庫系統,資料通過 ETL 導入到數倉。數倉的典型用途是用于 BI 報表之類的分析場景,場景比較有限。在移動網際網路時代,資料來源更加豐富多樣,資料結構也不僅僅是結構化資料,資料用途也不僅限于分析,于是出現了資料湖。資料先不做,或者僅做簡單的處理導入到資料湖,然後再進行篩選、過濾、轉換等 transform 操作,于是數倉時代的 ETL 變成了資料湖時代的 ELT。

資料湖的典型架構是上層一個/或者多個分析引擎/或者其他計算架構,下層架設一個分布式存儲系統,如下圖左邊所示。但是這種原始的資料湖用法是缺少管理的,比如缺少事務的支援,缺少資料品質的校驗等等,一切資料管理完全靠人工手動保證。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

Delta Lake 就是在統一的存儲層上面架上一層管理層,以解決人們手動管理資料湖資料的痛點。加上了一層管理層,首先我們就可以引入meta data管理,有了meta data管理,如果資料有schema,我們就可以管理schema,在資料入庫的過程中對資料品質進行校驗,并将不符合的資料剔除。另外,管理了meta data,還可以實作ACID Transactions,也就是事務的特性。在沒有管理層的時候如果進行并發的操作,多個操作之間可能互相影響,比如一個使用者在查詢的時候另外一個使用者進行了删除操作,有了事務的支援,就可以避免這種情況,在事務的支援下,每個操作都會生成一個快照,所有操作會生成一個快照序列,友善進行時間上的回溯,也就是時間旅行。

Data Warehouse、Data Lake和Delta Lake三者的主要特性對比如下圖所示。可以看出,Delta Lake相當于結合了Data Warehouse和Data Lake的優點,引入一個管理層,解決了大部分兩者的缺點。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

(三)基于Spark Streaming SQL & Delta 的CDC方案

那麼,我們現在回到我們的主題,即,如何實作基于Spark Streaming SQL & Delta 的CDC方案呢?如下圖所示,還是先從binlog到KafKa,與之前的方式不同的是無需将KafKa中的binlog回放到HBASE或者KUDU,而是直接放入Delta Lake即可。這種方案使用友善,無需額外運維,Merge邏輯容易實作,且幾乎是一個實時的資料流。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

上述方案的具體操作步驟如下圖所示。其本質就是不斷的将每一個mini batch給Merge INTO到目标表中。由于 Spark Streaming 的 mini batch 排程建個可以設定在秒級,是以該方案實作了近實時的資料同步。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

在該方案的實際執行的過程中我們也遇到了一些問題,最主要的就是小檔案問題,比如每五秒執行一次batch,那麼一天就會有非常多的batch,可能産生海量的小檔案,嚴重影響表的查詢性能。對于小檔案問題,其解決思路有以下幾個:

  • 增大排程批次間隔:如果對實時性要求不是很高,可以增大排程批次間隔,減少小檔案産生的頻率;
  • 小檔案合并:進行小檔案的合并,減小小檔案的數量,其文法如下:

    OPTIMIZE WHERE where_clause]

  • 自适應執行:自适應執行可以合并一些小的reduce task,進而減少小檔案數量。

對于小檔案合并的optimize觸發我們做了兩種方式。第一種是自動化的optimize,就是在每一個mini batch執行完之後都進行檢測是否需要進行合并,如果不需要就跳到下一個mini batch,判斷的規則有很多,比如小檔案達到一定數量、總得檔案體積達到一定大小就進行合并,當然在合并的時候也進行了一些優化,比如過濾掉本身已經比較大的檔案。自動化的optimize方式每過一定數量的batch就要進行一次merge操作,可能對資料資料攝入造成一定影響,是以還有第二種方式,就是定期執行optimize的方式,這種方式對于資料實時攝入沒有影響。但是,定期執行optimize的方式會存在事務沖突的問題,也就是optimize與流沖突,對于這種情況我們優化了Delta内部的事務送出機制,讓insert流不必失敗,如果在optimize之前進行了update/delete,而optimize成功了,那麼在成功之後要加一個重試的過程,以免流斷掉。

OPTIMIZE的實作也是比較複雜的,我們開發了bin-packing機制和自适應機制,達成的效果就是在OPTIMIZE後所有檔案(除了最後一個)都達到目标大小(比如128M),而不論是否做了 re-partition。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

三、未來工作

未來,以下幾方面将會是我們的工作目标:

(1)自動Schema檢測

使用Delta Lake的使用者接觸的可能不隻是業務資料,還可能有機器資料。在很多場景下,機器資料的字段可能會發生變化。對于這種場景的使用者來說,迫切需要一種自動Schema檢測的機制。下一階段我們的目标就是在binlog解析的時候能夠自動檢測新增字段、變化字段等,并且反映在Delta表中。

(2)流式Merge性能(Merge on Read)

上面提到了Spark Streaming SQL & Delta 的CDC方案本質上是發起了一個流處理,然後按照mini batch将資料merge到目标表中,merge的實作實際上是一個join,當表越來越大的時候merge性能會越來越差,嚴重影響性能。解決這個問題的方式是采用merge on read的方式,就是類似于HIVE的方式,是我們下一步的目标。

(3)更易用的體驗

可以看到,上文提到的CDC方案還是需要使用者有一定的專業知識,并且需要手動做一些工作,下一步我們希望能夠提供更易用的體驗,進一步降低使用者的使用負擔。

關鍵詞:Delta Lake、CDC、實時數倉、OPTIMIZE、Spark Streaming SQL

EMR釘釘産品交流群

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

Apache Spark技術交流社群公衆号,微信掃一掃關注

Delta Lake 如何幫助雲使用者解決資料實時入庫問題

繼續閱讀