天天看點

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

作者:Lakehouse
目錄
1. Shopee 資料系統建設中面臨的典型問題
2. 為什麼選擇 Hudi
3. Shopee 在 Hudi 落地過程中的實踐
4. 社群貢獻
5. 總結與展望
           

湖倉一體(LakeHouse)作為大資料領域的重要發展方向,提供了流批一體和湖倉結合的新場景。目前,企業許多業務中會遇到的資料及時性、準确性,以及存儲的成本等問題,都可以通過湖倉一體方案得到解決。

當下,幾個主流的湖倉一體開源方案都在不斷疊代開發中,業界的應用也都是在摸索中前行,在實際的使用中難免會遇到一些不夠完善的地方和未支援的特性。Shopee 内部在使用過程中基于開源的 Apache Hudi 定制了自己的版本,以實作企業級的應用和一些内部業務需求的新特性。

通過引入 Hudi 的 Data lake 方案,Shopee 的 Data Mart、推薦、ShopeeVideo 等産品的資料處理流程實作了流批一體、增量處理的特性,很大程度上簡化了這一流程,并提升了性能。

1. Shopee 資料系統建設中面臨的典型問題

1.1 Shopee 資料系統簡介

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

上圖是 Shopee Data Infrastructure 團隊為公司内部業務方提供的一套整體解決方案。

  • 第一步是資料內建(Data Integration),目前我們提供了基于日志資料、資料庫和業務事件流的資料內建方式;
  • 然後通過平台的 ETL(Extract Transform Load)服務 load 到業務的數倉中,業務同學通過我們提供的開發平台和計算服務進行資料處理;
  • 最後的結果資料通過 Dashboard 進行展示,使用即時查詢引擎進行資料探索,或者通過資料服務回報到業務系統中。

下面先來分析一下 Shopee 資料系統建設中遇到的三個典型問題。

1.2 流批一體的資料內建

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

第一個問題:在基于資料庫的資料內建過程中,存在同一份資料同時面臨流處理和批處理的需求。傳統的做法是實作全量導出和 CDC 兩條鍊路。全量導對外連結路滿足批處理的需求,CDC 鍊路用于實時處理和增量處理的場景。

然而,這種做法存在的一個問題是全量導出效率低,導緻資料庫負載高。另外,資料一緻性也難以得到保證。

同時,在批資料集建構上有一定的存儲效率優化,是以我們希望基于 CDC 資料去建構批資料集,以此同時滿足三種處理場景的需求,提高資料時效性。

1.3 狀态表明細存儲

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

第二個問題是狀态表明細的存儲。我們可以認為,傳統批資料集是在某一時間點對業務資料整體狀态的一個快照,壓縮到一個點的快照會合并掉業務流程中的過程資訊。這些變化過程反映了使用者使用我們服務的過程,是非常重要的分析對象。一旦被合并掉,将無法展開。

另外,在很多場景下,業務資料每天變化的部分隻占全量資料的一小部分,每個批次都全量存儲會帶來很大的資源浪費。

1.4 大寬表建立

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

第三個問題是大寬表的建立。近實時寬表建構是資料進行中常見的一種場景,它存在的問題是傳統的批處理延遲過高,使用流式計算引擎資源浪費嚴重。是以,我們基于多個資料集合建構了業務寬表,支援 Ad hoc 類 OLAP 查詢。

2. 為什麼選擇 Hudi

針對上述業務中遇到的問題,基于以下三點考量,最終我們選擇 Apache Hudi 來作為解決方案。

2.1 生态支援豐富

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

我們期望使用純流式的方式建設資料內建環境,而 Hudi 對流式場景有着良好的支援。

第二點是對各個大資料生态的相容。我們建構的資料集将會同時存在批處理、流處理、增量處理和動态探索等多種需求的負載。目前這些工作負載運作在各種計算引擎中,是以,對多種計算引擎的支援也在我們的考慮範圍之内。

另一個考量點則是和 Shopee 業務需求的契合。目前,我們亟待處理的資料集大部分來源于業務系統,都帶有唯一性辨別資訊,是以 Hudi 的設計更加符合我們的資料特性。

2.2 插件化的能力

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

目前我們平台提供 Flink 和 Spark 作為通用計算引擎,作為資料內建和數倉建設負載的承載者,同時也使用 Presto 承載資料探索的功能。Hudi 對這三者都支援。

在實際使用中,根據業務資料的重要程度不同,我們也會給使用者提供不同的資料索引方式。

2.3 業務特性比對

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

在資料內建過程中,使用者的 schema 變化是一個非常常見的需要。ODS 的資料變化可能導緻下遊的計算任務出錯。同時,在增量處理時,我們需要時間處理的語義。支援主鍵資料的存儲對于我們業務資料庫的資料來說,意義重大。

3. Shopee 在 Hudi 落地過程中的實踐

3.1 實時資料內建

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

目前 Shopee 内部有大量的業務資料來自業務資料庫,我們采用類似 CDC 的技術擷取資料庫中的變更資料,給業務方建構支援批處理和近實時增量處理的 ODS 層資料。

當一個業務方的資料需要接入時,我們會在進行增量實時內建之前先做一次全量 Bootstrap,建構基礎表,然後基于新接入的 CDC 資料進行實時建構。

建構的過程中,我們一般根據使用者需求選擇建構的 COW 表或者 MOR 表。

1)問題建構與解決方案

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

在進行實時建構的過程中,存在以下兩種較為常見的問題:

一種是使用者将有大量變更的資料集的類型配置為 COW 表,導緻資料寫放大。此時我們需要做的事情是建立相應的監控來識别這種配置。同時,我們基于 MOR 表的配置化資料合并邏輯,支援資料檔案的同步或者異步更新。

第二個問題是預設的 Bloom filter 導緻資料存在性判斷的問題。這裡比較好的方式是采用 HBase Index 解決超大資料集的寫入問題。

2)問題解決的效果

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

這是将我們的某些資料內建鍊路換成基于 Hudi 的實時內建後的效果。上圖是資料可見性占比與時延的關系,目前我們能保證 80% 的資料在 10 分鐘内可見可用,所有的資料 15 分鐘内可見可用。

下圖是我們統計的資源消耗占比圖。藍色部分是實時鍊路的資源消耗,紅色是曆史的按批資料內建的資源消耗。

因為切換成了實時鍊路,對于一些大表重複率低的資料減少了重複處理,同時也減少了集中式處理效率降低導緻的資源消耗。是以,我們的資源消耗遠低于批處理方式。

3.2 增量視圖

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

針對使用者需要狀态明細的場景,我們提供了基于 Hudi Savepoint 功能的服務,按照使用者需要的時間周期,定期建構快照(snapshot),這些快照以分區的形式存在中繼資料管理系統中。

使用者可以友善地在 Flink、Spark,或者 Presto 中利用 SQL 去使用這些資料。因為資料存儲是完整且沒有合并的明細,是以資料本身支援全量計算,也支援增量處理。

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

在使用增量視圖的存儲時,對于一些變化資料占比不大的場景,會取得比較好的存儲節省效果。

這裡有一則簡單的公式,用于計算空間使用率:

(1 + (t - 1) * p ) / t

其中,P 表示變化資料的占比,t 表示需要儲存的時間周期數。變化資料占比越低,所帶來的存儲節省越好。對于長周期資料,也會有一個比較好的節省效果。

同時,這種方式對增量計算的資源節省效果也比較好。缺點是按批全量計算會有一定的讀放大的問題。

3.3 增量計算

當我們的資料集基于 Hudi MOR 表來建構時,就可以同時支援批處理、增量處理和近實時處理負載。

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

以圖為例,Table A 是一個增量的 MOR 表,當我們基于 Table A 來建構後續的表 B 和表 C 時,如果計算邏輯都支援增量的建構,那我們在計算的過程中,隻需要擷取新增的資料和變化的資料。這樣在計算的過程中就顯著減少了參與計算的資料量。

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

這裡是離線計算平台基于 Hudi 的增量計算來建構的一個近實時的使用者作業分析。當使用者送出一個 Spark 任務到叢集運作,任務結束後會自動收集使用者的日志,并從中提取相關的 Metric 和關鍵日志寫入到 Hudi 表。然後一個處理任務增量讀取這些日志,分析出任務的優化項,以供使用者參考。

當一個使用者作業運作完後,一分鐘之内就可以分析出使用者的作業情況,并形成分析報告提供給使用者。

增量 Join

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐
基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

除了增量計算,增量的 Join 也是一個非常重要的應用場景。

相對于傳統的 Join,增量計算隻需要根據增量資料查找到需要讀取的資料檔案,進行讀取,并分析出需要重寫的分區,重新寫入。

相對于全量來說,增量計算顯著減少了參與計算的資料量。

Merge Into

Merge Into 是在 Hudi 中非常實用的一個用于建構實時寬表的技術,它主要基于 Partial update 來實作。

MERGE INTO target_table t0
USING SOURCE TABLE s0
ON t0.id = s0.id
WHEN matched THEN UPDATE SET
t0.price = s0.price+5,
_ts = s0.ts;
           
MERGE INTO target_table_name [target_alias]
USING source_table_reference [source_alias]
ON merge_condition
[ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
[ WHEN NOT MATCHED [ AND condition ] THEN not_matched_action ] [...]

matched_action
{ DELETE |
UPDATE SET * |
UPDATE SET { column1 = value1 } [, ...] }

not_matched_action
{ INSERT * |
INSERT (column1 [, ...] ) VALUES (value1 [, ...])
           

這裡展示了基于 Spark SQL 的 Merge Into 文法,它讓使用者建構寬表的作業開發變得非常簡單。

基于 Merge Into 的增量 Join 實作

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

Hudi 的實作是采用 Payload 的方式,在一個 Payload 中可以隻存在一張表的部分列。

增量資料的 Payload 被寫入到 log 檔案中,然後在後續的合并中生成使用者使用的寬表。因為後續合并存在時間延遲,是以我們優化了合并的寫入邏輯。

在資料合并完成後,我們會在中繼資料管理中寫入一個合并的資料時間和相關的 DML,然後在讀取這張 MOR 表的過程中分析 DML 和時間,為資料可見性提供保障。

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

而采用 Partial Update 的好處是:

  • 顯著降低了流式建構大寬表的資源使用;
  • 檔案級别的資料修改時,處理效率增高。

4. 社群貢獻

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

在解決處理 Shopee 内部業務問題的同時,我們也貢獻了一批代碼到社群,将内部的優化和新特性分享出來,比較大的 feature 有

meta sync(RFC-55 已完成)

snapshot view(RFC-61)

partial update(HUDI-3304)

FileSystemLocker(HUDI-4065 已完成)

等等;同時也幫助社群修複了很多 bug。後續也希望能夠用這種方式,更好地滿足業務需求的同時,參與社群共建。

4.1 Snapshot View

增量視圖(snapshot view)有以下幾個典型應用場景:

  • 每天在基礎表上生成名稱為

    compacted-YYYYMMDD

    的快照,使用者使用快照表生成每日的衍生資料表,并計算報表資料。當使用者下遊的計算邏輯發生變化時,能夠選擇對應快照進行重新計算。還可以設定留存期為 X 天,每天清理掉過期資料。這裡其實也可以在多快照的資料上自然地實作 SCD-2。
  • 一個命名為

    yyyy-archived

    的存檔分支可以每年在資料進行壓縮和優化之後生成,如果我們的儲存政策有變化(例如要删除敏感資訊),那麼可以在進行相關的操作之後,在這個分支上生成一個新的快照。
  • 一個命名為

    preprod-xx

    的快照可以在進行了必要的品質檢查之後再正式釋出給使用者,避免外部工具與 pipeline 本身的耦合。

對于 snapshot view 的需求,Hudi 已經可以在一定程度上通過兩個關鍵特性來做支援:

  • Time travel:使用者可以提供一個時間點來查詢對應時間上的 Hudi 表快照資料。
  • Savepoint:可以保證某個 commit 時間點的快照資料不會被清理,而在 savepoint 之外的中間資料仍然可以被清理。

簡單的實作如下圖所示:

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

但是在實際的業務場景中,為了滿足使用者的 snapshot view 需求,還需要從易用性和可用性上考慮更多。

例如,使用者如何得知一個 snapshot 已經正确地釋出出來了?這其中包含的一個問題是可見性,也就是說,使用者應該可以在整個 pipeline 中顯式地拿到 snapshot 表,這裡就需要提供類似 Git 的 tag 功能,增強易用性。

另外,在打快照的場景中,一個常見的需求是資料的精準切分。一個例子就是使用者其實不希望 event time 在 1 号的資料漂移到 2 号的快照之中,更希望的做法是在每個 FileGroup 下結合 watermark 做精細的 instant 切分。

為了更好地滿足生産環境中的需求,我們實作了以下優化:

  • 擴充了 savepoint metadata,在此基礎上實作快照的 tag、branch 以及 lifecycle 管理,和自動的 meta 同步功能;
  • 在 MergeOnRead 表上實作精細化的 ro 表 base file 切分,在 compaction 的時候通過 watermark 切分日志檔案,保證 snapshot 的精确性。也就是說,我們可以在流式寫入的基礎上,給下遊的離線處理提供精确到 0 點的資料。

目前我們正在将整體功能通過 RFC-61 貢獻回社群,實際落地過程的收益前面章節已有介紹,這裡不再贅述。

4.2 多源 Partial update

前文簡單介紹了多源部分列更新(大寬表拼接)的場景,我們依賴 Hudi 的多源合并能力在存儲層實作 Join 的操作,大大降低了計算層在 state 和 shuffle 上的壓力。

目前,我們主要是通過 Hudi 内部的 Payload 接口實作多源的部分列更新。下面這張圖展示了 Payload 在 Hudi 的寫端和讀端的互動流程。

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

實作的原理基本上就是通過自定義的 Payload class 來實作相同 key 不同源資料的合并邏輯,寫端會在批次内做多源的合并并寫入 log,讀端在讀時合并時也會調用相同的邏輯來處理跨批次的情況。

這裡需要注意的是亂序和遲到資料(out-of-order and late events)的問題。如果不做處理,在下遊經常會導緻舊資料覆寫新資料,或者列更新不完整的情況。

針對亂序和遲到資料,我們對 Hudi 做了 Multiple ordering value 的增強,保證每個源隻能更新屬于自己那部分列的資料,并且可以根據設定的

event time (ordering value)

列,確定隻會讓新資料覆寫舊資料。

後續我們還準備結合 lock less multiple writers 來實作多 Job 多源的并發寫入。

5. 總結與展望

針對在 Shopee 資料系統建設中面臨的問題,我們提出了湖倉一體的解決方案,通過對比選型選擇了 Hudi 作為核心元件。

在落地過程中,我們通過使用 Hudi 的核心特性以及在此之上的擴充改造,分别滿足了三個主要的使用者需求場景:實時資料內建、增量視圖和增量計算。并為使用者帶來了低延時(約 10 分鐘)、降低計算資源消耗、降低存儲消耗等收益。

接下來,我們還将提供更多特性,并針對以下兩個方面做進一步完善,進而滿足使用者更多的場景,提供更好的性能。

5.1 跨任務并發寫支援

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

目前 Hudi 支援了基于檔案鎖的單個任務單 writer 的寫入方式。

但是在實際中,有一些場景需要多個任務多 writer 同時寫入,且寫入分區有交叉,目前的 OCC 對這種情況支援不佳。目前我們正在與社群合作解決 Flink 與 Spark 多重 writer 的場景。

5.2 性能優化

基于 Apache Hudi 的湖倉一體技術在 Shopee 的實踐

中繼資料讀取以及 File listing 操作無論是在寫入端還是讀取端都會有很大的性能消耗,海量的分區對外部中繼資料系統(比如 HMS)也會造成很大壓力。

針對這一問題,我們計劃第一步将 schema 之外的資訊存儲從 HMS 過渡到 MDT;第二步是在未來使用一個獨立的 MetaStore 和 Table service 的 server,不再強耦合于 HDFS。

在這個 server 中,我們可以更容易地優化讀取性能,更靈活地進行資源調整。