天天看點

Apache hudi在騰訊的落地與實踐

作者:DataFunTalk

導讀 這裡放置導讀部分的内容随着 Apache Parquet 和 Apache ORC 等存儲格式以及 Presto 和 Apache Impala 等查詢引擎的發展,Hadoop 生态系統有潛力作為面向分鐘級延時場景的通用統一服務層。然而,為了實作這一點,Hadoop 生态系統需要在 Hadoop 分布式檔案系統(HDFS)中進行高效、低延遲的資料攝取和資料準備。為了解決這個問題,我們在Uber内部建構了 Hudi。Apache Hudi 是新一代流式資料湖平台,支援插入、更新、删除以及增量資料處理;可助力建構高效的企業級資料湖,目前已經在國内外多個大型公司生産落地。今天會和大家分享 Apache Hudi 在騰訊的落地與實踐。

全文目錄如下:

‍1. Hudi 的功能與構成

2. Hudi 的基本概念介紹

3. Hudi 的應用場景

4. Hudi 的未來展望

分享嘉賓|Forward Xu(徐前進) 騰訊資料湖研發進階工程師

編輯整理|張文鳳

出品社群|DataFun

01

Hudi的功能與構成

1. HUDI 的能力和定位

Apache Hudi是一個基于資料庫核心的流式資料湖平台,支援流式工作負載,事務,并發控制,Schema演進與限制;同時支援 Spark/Presto/Trino/Hive 等生态對接,在資料庫核心側支援可插拔索引的更新,删除,同時會自動管理檔案大小,資料 Clustering,Compaction,Cleaning 等。

2. HUDI 的整體結構

Apache hudi在騰訊的落地與實踐

可以基于雲存儲/HDFS 建構基于 Hudi 的 Lakehouse,Hudi 支援 Parquet、ORC、HFile、Avro 格式,同時提供了非常豐富的 API,如 Spark DF、RDD、FlinkSQL、Flink DataStream API,利用這些 API 可以非常友善地對 Hudi 表進行操作,同時 Hudi 也內建了其他生态,如 MPP 引擎 StarRocks,Doris 等。

--

02

Hudi的基本概念介紹

基本概念介紹:

Apache hudi在騰訊的落地與實踐

Hudi 的基本概念由 Timeline 和 File Layout 組成

  • Timeline 由一個個 commit 組成,包括 DELTA_COMMIT,COMMITS,CLEANS,ROLLBACK,REPLACECOMMIT 等,同時每個 commit 都包含對應的狀态,如 requested,inflight,completed 三種狀态,分别代表請求開始處理,正在處理,處理完成。
  • File Layout 主要由 FileGroup 構成,FileGroup 由 FileSlice 組成,每個 FileSlice 相當于一個版本,包含一個 Base 檔案和多個 Log 檔案。

Hudi支援MOR和COW兩種類型,MOR表對流式寫入更友好,延遲更低,對于更新的log檔案支援同步和異步兩種模式進行 Compaction,生成新的Base檔案,以加速查詢,支援 Snapshot,Read Optimized,Incremental 讀取。

Apache hudi在騰訊的落地與實踐

而對于COW表,每次寫入需要重寫檔案,寫放大相對嚴重,延遲相對MOR較高,更适合寫少讀多的場景。

Apache hudi在騰訊的落地與實踐

為了加速資料的更新,Hudi 支援多種索引,如分區級别的索引以及全表索引,分區級别的索引可以保證資料在分區内的唯一性,全表索引保證資料在表級的唯一性(開銷較大)。Hudi 支援了多種類型的索引實作,典型的如 BLOOM、BUCKET 索引,以及自定義索引等方式。

Apache hudi在騰訊的落地與實踐

另外一個核心的概念是Hudi的Table Service,包含Compaction操作,Compaction針對FileSlice進行操作,會将Base檔案和其對應的Log檔案進行合并,産生新的Base檔案;可以通過指定NUM_COMMITS或TIME_ELAPSED兩種政策排程執行Compaction,對于排程執行而言,Hudi為不影響主鍊路的寫入,支援了異步排程與執行,以及同步排程與執行,同步排程異步執行方式,滿足不同的需求。

Apache hudi在騰訊的落地與實踐

另外一個Table Service是Clean,Clean用于删除過期的檔案,同樣與Compaction類型也提供了多種政策以及排程執行政策,值得注意的是對于做了Savepoint的時間點,其對應的檔案不會被删除。

Apache hudi在騰訊的落地與實踐

接下來分析對于COW表的不同查詢的實作,如在instant 0 時刻寫入一部分資料(ABCDE),在instant 1時刻更新A -> A',D -> D',在instant 2時刻更新A' -> A'',E -> E',并插入F 那麼對于快照查詢(Snapshot Query)每次都是讀取的最新的FileSlice,增量查詢(Incremental Query)讀取指定commit之間的Parquet檔案,然後再将時間範圍下推至Parquet檔案進行過濾,隻讀取符合條件的變更的資料。

Apache hudi在騰訊的落地與實踐

對于MOR表,快照查詢(SNAPSHOT Query)讀取的是Base檔案與Log合并後的最新結果;而增量查詢讀取指定commit之間的Parquet以及Log檔案,然後再對Log檔案進行Block級别的過濾(根據Commit時間),合并重複key後傳回結果。

--

03

Hudi 的應用場景

1. CDC 資料入湖

Apache hudi在騰訊的落地與實踐

這個場景主要是DB資料入湖入倉,把原來T + 1的資料新鮮度提升到分鐘級别。資料新鮮度通過目前比較火的以Debezium、Maxwell為代表的CDC(change Data Capture)技術實作。以Streaming近實時的方式同步到數倉中。在傳統的Hive數倉中想保證明時是非常困難的,尤其是檔案更新,湖表實時寫入更新,基本不可能實作。

CDC技術對數倉本身存儲是有要求的,首先是更新效率得足夠高,能夠支援以Streaming方式寫入,并且能夠非常高效的更新。尤其是CDC log在更新過程中還可能會亂序,如何保證這種亂序更新的ACID語義,是有很高要求的,目前能滿足亂序更新的湖格式隻有Hudi能做到,而且Hudi還考慮到了更新的效率問題,是目前比較先進的架構。

圖中方案3相比上面的方案,比較适合體量比較大(每天增量能達到億級别)、資料平台比較健全的公司,中間有一套統一的資料同步方案(彙總不同源表資料同步至消息隊列),消息隊列承擔了資料的容錯、容災、緩存功能。同時,這套方案的擴充性也更好。通過Kafka的topic subscribe方式,可以比較靈活地分發資料。通過以上三種方式入湖Hudi,以某資料中台為例已經有6000多張源表寫入Hudi,日增幾十億資料入湖。

2. 分鐘級實時數倉

Apache hudi在騰訊的落地與實踐

第二個場景是構造分鐘級别的實時數倉,分鐘級别的端到端資料新鮮度,同時又非常開放的OLAP查詢引擎可以适配。其實是對Kappa架構或者是原先Streaming數倉架構的一套新解法。在沒有這套架構之前,實時分析會跳過Hudi直接把資料雙寫到OLAP系統中,比如ClickHouse、ES、MongoDB等。當數倉存儲已經可以支援高效率分級别更新,能夠對接OLAP引擎,那麼這套架構就被大大簡化,首先不用雙寫,一份資料就可以保證only one truth語義,避免雙寫帶來資料完整性的問題。其次因為湖格式本身是非常開放的,在查詢端引擎可以有更多選擇,比如Hudi就支援Presto、Trino、Spark、StarRocks,以及雲廠商的Redshift引擎,會有非常高的靈活度。多層資料可見性也從T+1 小時或天,縮短到分鐘級别。

3. 流式計算PV/UV

Apache hudi在騰訊的落地與實踐

Apache Hudi 的Payload是一種可擴充的資料處理機制,通過不同的Payload我們可以實作複雜場景的定制化資料寫入方式,大大增加了資料處理的靈活性。Hudi Payload 在寫入和讀取 Hudi 表時對資料進行去重、過濾、合并等操作的工具類,通過使用參數"hoodie.datasource.write.payload.class"指定我們需要使用的Payload class。為了實作 pv/uv計算,我們實作了 RecordCountAvroPayload ,它可以在對資料去重的時候,将重複資料的數量記錄下來,這裡的重複指的是HoodieKey(primary key + partition path)相同。以往處理方式是通過Flink + window 聚合實作,該方式有延遲資料丢棄和state爆掉風險,Hudi Payload機制則沒有這些風險。

4. 多流拼接(大寬表)

Apache hudi在騰訊的落地與實踐

上圖是一個典型的非常複雜的業務落地,消息流1由Kafka寫入Hudi商品銷售明細表,消息流2由Kafka寫入Hudi使用者基本屬性表,然後結合Hudi商品标簽表和Hve使用者擴充屬性表進行實時和離線拼接大寬表。

Apache hudi在騰訊的落地與實踐

在實作多流拼接功能前有三個前置條件需要滿足:

  • 基于樂觀鎖的Timeline
  • 基于marker的早期沖突檢測
  • 啟用occ(樂觀并發控制)

這裡主要描述基于時間線伺服器的标記機制,該機制優化了存儲标記的相關延遲。Hudi 中的時間線伺服器用作提供檔案系統和時間線視圖。如下圖所示,新的基于時間線伺服器的标記機制将标記建立和其他标記相關操作從各個執行器委托給時間線伺服器進行集中處理。時間線伺服器在記憶體中為相應的标記請求維護建立的标記,時間線伺服器通過定期将記憶體标記重新整理到存儲中有限數量的底層檔案來實作一緻性。通過這種方式,即使資料檔案數量龐大,也可以顯著減少與标記相關的實際檔案操作次數和延遲,進而提高寫入性能。

Apache hudi在騰訊的落地與實踐

實作的原理基本上就是通過自定義的 Payload class 來實作相同 key 不同源資料的合并邏輯,寫端會在批次内做多源的合并,并寫入 log,讀端在讀時合并時也會調用相同的邏輯來處理跨批次的情況。這裡需要注意的是亂序和遲到資料(out-of-order and late events)的問題。如果不做處理,在下遊經常會導緻舊資料覆寫新資料,或者列更新不完整的情況。針對亂序和遲到資料,我們對 Hudi 做了 Multiple ordering value 的增強,保證每個源隻能更新屬于自己那部分列的資料,并且可以根據設定的 event time (ordering value) 列,確定隻會讓新資料覆寫舊資料。最後結合 lock less multiple writers 來實作多 Job 多源的并發寫入。

Apache hudi在騰訊的落地與實踐

介紹多流拼接場景下 Snapshot Query 的核心過程,即先對 LogFile 進行去重合并,然後再合并 BaseFile 和去重後的 LogFile 中的資料。上圖顯示了整個資料合并的過程,具體可以拆分成以下兩個過程:

(1)Merge LogFile

Hudi 現有邏輯是将 LogFile 中的資料讀出來存放在 Map 中,對于 LogFile 中每條 Record,如果 Key 不存在 Map 中,則直接放入 Map,如果 Key 已經存在于 Map 中,則需要更新操作。

在多流拼接中,因為 LogFile 中存在不同資料流寫入的資料,即每條資料的列可能不相同,是以在更新的時候需要判斷相同 Key 的兩個 Record 是否來自同一個流,是則做更新,不是則做拼接。如圖 3 所示,讀到 LogFile2 中的主鍵是 key1 的 Record 時,key1 對應的 Record 在 Map 中已經存在,但這兩個 Record 來自不同流,則需要拼接形成一條新的 Record (key1,b0_new,c0_new,d0_new) 放入 Map 中。

(2)Merge BaseFile and LogFile

Hudi 現有預設邏輯是對于每一條存在于 BaseFile 中的 Record,檢視 Map 中是否存在 key 相同的 Record,如果存在,則用 Map 中的 Record 覆寫 BaseFile 中的 Record。在多流拼接中,Map 中的 Record 不會完整覆寫 BaseFile 中對應的 Record,可能隻會更新部分列的值,即 Map 中的 Record 對應的列。

如上圖所示,以最簡單的覆寫邏輯為例,當讀到 BaseFile 中的主鍵是 key1 的 Record 時,發現 key1 在 Map 中已經存在并且對應的 Record 有 BCD 三列的值,則更新 BaseFile 中的 BCD 列,得到新的 Record(key1,b0_new,c0_new,d0_new,e0),注意 E 列沒有被更新,是以保持原來的值 e0。對于新增的 Key 如 Key3 對應的 Record,則需要将 BCE 三列補上預設值形成一條完整的 Record。

5. 批流探索-廣告歸因

Apache hudi在騰訊的落地與實踐

廣告歸因是指在使用者在廣告行為鍊路中,使用科學的比對模型兩兩比對各環節的行為資料點,可用于判斷使用者從何管道下載下傳應用(或打開小程式),通過比對使用者廣告行為,分析是何原因促使使用者産生轉化。廣告歸因的資料結果是衡量廣告效果、評估管道品質的重要依據,可幫助廣告主合理優化廣告素材,高效開展拉新、促活營銷推廣,而實時廣告歸因則能更及時地應用到優化廣告投放的過程中。

在增長買量業務場景中,買量團隊在快手、百度、位元組等管道上投放廣告,比如某雲遊戲廣告素材,吸引潛在使用者點選廣告,進入業務開始玩雲遊戲,也可以下載下傳遊戲的APK安裝包,進而實作将使用者轉化成業務新增使用者和遊戲新增使用者的目的。如下圖所示,管道方可以擷取使用者的點選資料,業務可以擷取新增使用者的資料,在點選歸因鍊路中,就是将業務新增使用者比對到使用者在某管道上近N天的最後一次廣告點選,在正常的業務過程中,先有使用者點選廣告資料,後有業務新增使用者資料,根據離線資料統計經驗,點選轉化成新增使用者的視窗時間最長不超過3天,也就是N=3。

Apache hudi在騰訊的落地與實踐
  • 資料流一,Flink SQL消費點選資料,并通過Upsert方式(row-level update)寫入資料湖Hudi點選表,MOR特性取最後一次點選資料。
  • 資料流二,Flink SQL消費應用寶新增資料,通過Append方式寫入資料湖Hudi新增表。
  • 批處理三,Super SQL讀Hudi新增表(當日)、Hudi點選表(近N天)關聯,通過Merge Into文法(row-level update)寫入歸因結果Hudi表。Super SQL底層計算引擎是Spark3,該任務通過US系統每10分鐘排程一次。
  • 資料流四,Flink SQL通過snapshot-id方式(流式讀取)将歸因結果表實時出湖到CDMQ,保持資料應用接口和方案一緻。
Apache hudi在騰訊的落地與實踐

基于Hudi方案優勢如下:

  • 準實時和離線資料統一存儲,歸因率和T+1保持一緻,Hudi歸因率從原來的80%提升至 85%。
  • •Flink SQL、Super SQL開發簡化了編碼過程,降低了開發成本。
  • 穩定性高,一般情況的資料延遲通過US在下個定時周期自動修複,維護成本低。
  • 時效性是10分鐘排程+3分鐘運作 <15分鐘。

6. 批流探索:流轉批

在某些業務場景下,我們需要一個标志來衡量Hudi資料寫入的進度,比如:Flink 實時向 Hudi 表寫入資料,然後使用這個 Hudi 表來支援批量計算并通過一個 flag 來評估它的分區資料是否完整進而進一步寫入分區資料進行分區級别的ETL,這也就是我們通常說的流轉批。

Apache hudi在騰訊的落地與實踐

上左圖中Flink Sink包含了兩個算子。第一個writer 算子,它負責把資料寫入檔案,writer在checkpoint觸發時,會把自己寫入的最大的一個時間傳到commit算子中,然後commit算子從多個上遊傳過來的時間中選取一個最小值作為這一批送出資料的時間,并寫入HUDI表的中繼資料中。

我們的方案是将這個進度值(EventTime)存儲為 Hudi 送出(版本)中繼資料的屬性裡,然後通過通路這個中繼資料屬性擷取這個進度值。在下遊的批處理任務之前加一個監控任務去監控最新快照中繼資料。如果它的時間已經超過了目前的分區時間,就認為這個表的資料已經完備了,這個監控任務就會成功觸發下遊的批處理任務進行計算,這樣可以防止在異常場景下資料管道或者批處理任務空跑的情況。

上右圖是一個Flink 1分鐘級别入庫到HUDI ODS表, 然後通過流轉批計算寫入HUDI DWD表的執行過程。

Apache hudi在騰訊的落地與實踐

如何解決亂序到來問題, 我們可以通過設定SpedGapTime來設定允許延遲到來的範圍預設是0 不會延遲到來。我們同時也為社群做了Call Command的指令,其主要有50多個功能,主要覆寫在運維表、優化表、快照管理和中繼資料管理,可用其進行資料讀取、檔案合并、資料優化、統計表資訊等具體功能,便于內建與平台化。

Apache hudi在騰訊的落地與實踐
Apache hudi在騰訊的落地與實踐

--

04

Hudi的未來展望

Apache hudi在騰訊的落地與實踐
Apache hudi在騰訊的落地與實踐

Hudi前期support Change-Data-Capture,這張圖大緻描述了Hudi在Upsert、Delete所産生的更新的資料和底熱資料。它會産生before和after的資料,我們從得到的資料再得到更新或删除操作的資料,進而還原出使用者不能寫入的資料。

Apache hudi在騰訊的落地與實踐

我們在内部會有一個計算引擎,進行Parse+Query Optimizer和Physical Plan+Query Execution的功能,我們以SQL為統一入口,基于Hudi的Flink ETL,将我們所需要的資料在ODS層、DWD層、ADS層實時物化下來,統一通過Query改寫的手段,讓使用者在查詢時,直接讀取出物化的結果,達到更低時效的寫入物化結果,還能得到若幹倍的查詢性能的提升。該方案目前正在進行中,未來将不斷進行探索。

--

05

問答環節

Q1:流轉批的check腳本需要自己實作嗎?是否需要和排程系統進行內建?

A1:可以的,直接可以SQL的方式實作。但帶來的問題是你用spark去送出和執行這樣一個指令的話,可能會有一個申請資源的消耗。比如執行指令一秒鐘,申請資源兩分鐘,這樣就有一個開銷了。假如你用一個Java client的方式,然後內建到你的平台的話,那麼你仍然可以保留快速擷取未解資訊的邏輯。是以兩種方式都可以,不一定要寫腳本。

Q2:Call Command未來會支援Flink嗎?

A2:在Flink這塊還沒有看到類似Call Command的文法,它是一個來源于存儲過程的文法, Flink這一塊要加的話會晚一些,至少需要先把文法的部分實作。

今天的分享就到這裡,謝謝大家。

Apache hudi在騰訊的落地與實踐

▌2023亞馬遜雲科技中國峰會

2023年6月27-28日9:00-17:00,2023亞馬遜雲科技中國峰會将在上海世博中心舉辦。

本次峰會将會分享數百個技術話題與最佳實踐,覆寫汽車、制造、金融、醫療與生命科學、電商、遊戲、泛娛樂、電信、教育、數字化營銷等領域。

下面給大家預告一些精彩議題報名參會,請點選“下方連結”:

2023年亞馬遜雲科技中國峰會 - 因建構_而可見

大資料方向議題 算法方向議題
下一代“智能湖倉”架構演進 玩轉Stable Diffusion模型的微調與提示詞工程
資料合規與雲上安全架構建構實踐 智能搜尋技術在金融行業的應用
靈活資料分析架構詳解 基于開源LLM模型如何快速建構類ChatGPT應用?
雲原生資料庫最佳實踐 大語言模型(LLM)驅動的AIGC應用架構解密
智慧醫療: 本地化與全球化精選案例合集 生成式AI在遊戲行業的應用
技術人員如何抓住風口擷取成功? AIGC在網際網路行業與傳統行業的應用與創新案例