作者:張宏博,Soul大資料工程師
一、背景介紹
(一)業務場景
傳統離線數倉模式下,日志入庫前首要階段便是ETL,Soul的埋點日志資料量龐大且需動态分區入庫,在按day分區的基礎上,每天的動态分區1200+,分區資料量大小不均,數萬條到數十億條不等。下圖為我們之前的ETL過程,埋點日志輸入Kafka,由Flume采集到HDFS,再經由天級Spark ETL任務,落表入
Hive。任務淩晨開始運作,資料處理階段約1h,Load階段1h+,整體執行時間為2-3h。

(二)存在的問題
在上面的架構下,我們面臨如下問題:
1.天級ETL任務耗時久,影響下遊依賴的産出時間。
2.淩晨占用資源龐大,任務高峰期搶占大量叢集資源。
3.ETL任務穩定性不佳且出錯需淩晨解決、影響範圍大。
二、為什麼選擇Delta?
為了解決天級ETL逐漸尖銳的問題,減少資源成本、提前資料産出,我們決定将T+1級ETL任務轉換成T+0實時日志入庫,在保證資料一緻的前提下,做到資料落地即可用。
之前我們也實作了Lambda架構下離線、實時分别維護一份資料,但在實際使用中仍存在一些棘手問題,比如:無法保證事務性,小檔案過多帶來的叢集壓力及查詢性能等問題,最終沒能達到理想化使用。
是以這次我們選擇了近來逐漸進入大家視野的資料湖架構,資料湖的概念在此我就不過多贅述了,我了解它就是一種将中繼資料視為大資料的Table Format。目前主流的資料湖分别有Delta Lake(分為開源版和商業版)、Hudi、Iceberg,三者都支援了ACID語義、Upsert、Schema動态變更、Time Travel等功能,其他方面我們做些簡單的總結對比:
開源版Delta
優勢:
1.支援作為source流式讀
2.Spark3.0支援sql操作
劣勢:
1.引擎強綁定Spark
2.手動Compaction
3.Join式Merge,成本高
Hudi
1.基于主鍵的快速Upsert/Delete
2.Copy on Write / Merge on Read 兩種merge方式,分别适配讀寫場景優化
3.自動Compaction
1.寫入綁定Spark/DeltaStreamer
2.API較為複雜
Iceberg
1.可插拔引擎
1.調研時還在發展階段,部分功能尚未完善
2.Join式Merge,成本高
調研時期,阿裡雲的同學提供了EMR版本的Delta,在開源版本的基礎上進行了功能和性能上的優化,諸如:SparkSQL/Spark Streaming SQL的內建,自動同步Delta中繼資料資訊到HiveMetaStore(MetaSync功能),自動Compaction,适配Tez、Hive、Presto等更多查詢引擎,優化查詢性能(Zorder/DataSkipping/Merge性能)等等
三、實踐過程
測試階段,我們回報了多個EMR Delta的bug,比如:Delta表無法自動建立Hive映射表,Tez引擎無法正常讀取Delta類型的Hive表,Presto和Tez讀取Delta表資料不一緻,均得到了阿裡雲同學的快速支援并一一解決。
引入Delta後,我們實時日志入庫架構如下所示:
資料由各端埋點上報至Kafka,通過Spark任務分鐘級以Delta的形式寫入HDFS,然後在Hive中自動化建立Delta表的映射表,即可通過Hive MR、Tez、Presto等查詢引擎直接進行資料查詢及分析。
我們基于Spark,封裝了通用化ETL工具,實作了配置化接入,使用者無需寫代碼即可實作源資料到Hive的整體流程接入。并且,為了更加适配業務場景,我們在封裝層實作了多種實用功能:
1. 實作了類似Iceberg的hidden partition功能,使用者可選擇某些列做适當變化形成一個新的列,此列可作為分區列,也可作為新增列,使用SparkSql操作。如:有日期列date,那麼可以通過 'substr(date,1,4) as year' 生成新列,并可以作為分區。
2. 為避免髒資料導緻分區出錯,實作了對動态分區的正則檢測功能,比如:Hive中不支援中文分區,使用者可以對動态分區加上'\w+'的正則檢測,分區字段不符合的髒資料則會被過濾。
3. 實作自定義事件時間字段功能,使用者可選資料中的任意時間字段作為事件時間落入對應分區,避免資料漂移問題。
4. 嵌套Json自定義層數解析,我們的日志資料大都為Json格式,其中難免有很多嵌套Json,此功能支援使用者選擇對嵌套Json的解析層數,嵌套字段也會被以單列的形式落入表中。
5. 實作SQL化自定義配置動态分區的功能,解決埋點資料傾斜導緻的實時任務性能問題,優化資源使用,此場景後面會詳細介紹。
平台化建設:我們已經把日志接入Hive的整體流程嵌入了Soul的資料平台中,使用者可通過此平台申請日志接入,由審批人員審批後進行相應參數配置,即可将日志實時接入Hive表中,簡單易用,降低操作成本。
為了解決小檔案過多的問題,EMR Delta實作了Optimize/Vacuum文法,可以定期對Delta表執行Optimize文法進行小檔案的合并,執行Vacuum文法對過期檔案進行清理,使HDFS上的檔案保持合适的大小及數量。值得一提的是,EMR Delta目前也實作了一些auto-compaction的政策,可以通過配置來自動觸發compaction,比如:小檔案數量達到一定值時,在流式作業階段啟動minor compaction任務,在對實時任務影響較小的情況下,達到合并小檔案的目的。
四、問題 & 方案
接下來介紹一下我們在落地Delta的過程中遇到過的問題
(一)埋點資料動态分區資料量分布不均導緻的資料傾斜問題
Soul的埋點資料是落入分區寬表中的,按埋點類型分區,不同類型的埋點資料量分布不均,例如:通過Spark寫入Delta的過程中,5min為一個Batch,大部分類型的埋點,5min的資料量很小(10M以下),但少量埋點資料量卻在5min能達到1G或更多。資料落地時,我們假設DataFrame有M個partition,表有N個動态分區,每個partition中的資料都是均勻且混亂的,那麼每個partition中都會生成N個檔案分别對應N個動态分區,那麼每個Batch就會生成M*N個小檔案。
為了解決上述問題,資料落地前對DataFrame按動态分區字段repartition,這樣就能保證每個partition中分别有不同分區的資料,這樣每個Batch就隻會生成N個檔案,即每個動态分區一個檔案,這樣解決了小檔案膨脹的問題。但與此同時,有幾個資料量過大的分區的資料也會隻分布在一個partition中,就導緻了某幾個partition資料傾斜,且這些分區每個Batch産生的檔案過大等問題。
解決方案:如下圖,我們實作了使用者通過SQL自定義配置repartition列的功能,簡單來說,使用者可以使用SQL,把資料量過大的幾個埋點,通過加鹽方式打散到多個partition,對于資料量正常的埋點則無需操作。通過此方案,我們把Spark任務中每個Batch執行最慢的partition的執行時間從3min提升到了40s,解決了檔案過小或過大的問題,以及資料傾斜導緻的性能問題。
(二)應用層基于中繼資料的動态schema變更
資料湖支援了動态schema變更,但在Spark寫入之前,構造DataFrame時,是需要擷取資料schema的,如果此時無法動态變更,那麼便無法把新字段寫入Delta表,Delta的動态schena便也成了擺設。埋點資料由于類型不同,每條埋點資料的字段并不完全相同,那麼在落表時,必須取所有資料的字段并集,作為Delta表的schema,這就需要我們在建構DataFrame時便能感覺是否有新增字段。
解決方案:我們額外設計了一套中繼資料,在Spark建構DataFrame時,首先根據此中繼資料判斷是否有新增字段,如有,就把新增字段更新至中繼資料,以此中繼資料為schema建構DataFrame,就能保證我們在應用層動态感覺schema變更,配合Delta的動态schema變更,新字段自動寫入Delta表,并把變化同步到對應的Hive表中。
(三)Spark Kafka偏移量送出機制導緻的資料重複
我們在使用Spark Streaming時,會在資料處理完成後将消費者偏移量送出至Kafka,調用的是spark-streaming-kafka-0-10中的commitAsync API。我一直處于一個誤區,以為資料在處理完成後便會送出目前Batch消費偏移量。但後來遇到Delta表有資料重複現象,排查發現偏移量送出時機為下一個Batch開始時,并不是目前Batch資料處理完成後就送出。那麼問題來了:假如一個批次5min,在3min時資料處理完成,此時成功将資料寫入Delta表,但偏移量卻在5min後(第二個批次開始時)才成功送出,如果在3min-5min這個時間段中,重新開機任務,那麼就會重複消費目前批次的資料,造成資料重複。
解決方案:
1.StructStreaming支援了對Delta的exactly-once,可以使用StructStreaming适配解決。
2.可以通過其他方式維護消費偏移量解決。
(四)查詢時解析中繼資料耗時較多
因為Delta單獨維護了自己的中繼資料,在使用外部查詢引擎查詢時,需要先解析中繼資料以擷取資料檔案資訊。随着Delta表的資料增長,中繼資料也逐漸增大,此操作耗時也逐漸變長。
解決方案:阿裡雲同學也在不斷優化查詢方案,通過緩存等方式盡量減少對中繼資料的解析成本。
(五)關于CDC場景
目前我們基于Delta實作的是日志的Append場景,還有另外一種經典業務場景CDC場景。Delta本身是支援Update/Delete的,是可以應用在CDC場景中的。但是基于我們的業務考量,暫時沒有将Delta使用在CDC場景下,原因是Delta表的Update/Delete方式是Join式的Merge方式,我們的業務表資料量比較大,更新頻繁,并且更新資料涉及的分區較廣泛,在Merge上可能存在性能問題。
阿裡雲的同學也在持續在做Merge的性能優化,比如Join的分區裁剪、Bloomfilter等,能有效減少Join時的檔案數量,尤其對于分區集中的資料更新,性能更有大幅提升,後續我們也會嘗試将Delta應用在CDC場景。
五、後續計劃
1.基于Delta Lake,進一步打造優化實時數倉結構,提升部分業務名額實時性,滿足更多更實時的業務需求。
2.打通我們内部的中繼資料平台,實作日志接入->實時入庫->中繼資料+血緣關系一體化、規範化管理。
3.持續觀察優化Delta表查詢計算性能,嘗試使用Delta的更多功能,比如Z-Ordering,提升在即席查詢及資料分析場景下的性能。
歡迎試用
歡迎對阿裡雲 Delta Lake感興趣的朋友加入阿裡雲EMR釘釘群交流,群内會定期進行精品内容分享,測試請@揚流,釘釘群如下