作者:王祥虎(Apache Hudi 社群)
Apache Hudi 是由 Uber 開發并開源的資料湖架構,它于 2019 年 1 月進入 Apache 孵化器孵化,次年 5 月份順利畢業晉升為 Apache 頂級項目。是目前最為熱門的資料湖架構之一。
1. 為何要解耦
Hudi 自誕生至今一直使用 Spark 作為其資料處理引擎。如果使用者想使用 Hudi 作為其資料湖架構,就必須在其平台技術棧中引入 Spark。放在幾年前,使用 Spark 作為大資料處理引擎可以說是很平常甚至是理所當然的事。因為 Spark 既可以進行批處理也可以使用微批模拟流,流批一體,一套引擎解決流、批問題。然而,近年來,随着大資料技術的發展,同為大資料處理引擎的 Flink 逐漸進入人們的視野,并在計算引擎領域獲占據了一定的市場,大資料處理引擎不再是一家獨大。在大資料技術社群、論壇等領地,Hudi 是否支援使用 Flink 計算引擎的的聲音開始逐漸出現,并日漸頻繁。是以使 Hudi 支援 Flink 引擎是個有價值的事情,而內建 Flink 引擎的前提是 Hudi 與 Spark 解耦。
同時,縱觀大資料領域成熟、活躍、有生命力的架構,無一不是設計優雅,能與其他架構互相融合,彼此借力,各專所長。是以将 Hudi 與 Spark 解耦,将其變成一個引擎無關的資料湖架構,無疑是給 Hudi 與其他元件的融合創造了更多的可能,使得 Hudi 能更好的融入大資料生态圈。
2. 解耦難點
Hudi 内部使用 Spark API 像我們平時開發使用 List 一樣稀松平常。自從資料源讀取資料,到最終寫出資料到表,無處不是使用 Spark RDD 作為主要資料結構,甚至連普通的工具類,都使用 Spark API 實作,可以說 Hudi 就是用 Spark 實作的一個通用資料湖架構,它與 Spark 的綁定可謂是深入骨髓。
此外,此次解耦後內建的首要引擎是 Flink。而 Flink 與 Spark 在核心抽象上差異很大。Spark 認為資料是有界的,其核心抽象是一個有限的資料集合。而 Flink 則認為資料的本質是流,其核心抽象 DataStream 中包含的是各種對資料的操作。同時,Hudi 内部還存在多處同時操作多個 RDD,以及将一個 RDD 的處理結果與另一個 RDD 聯合處理的情況,這種抽象上的差別以及實作時對于中間結果的複用,使得 Hudi 在解耦抽象上難以使用統一的 API 同時操作 RDD 和 DataStream。
3. 解耦思路
理論上,Hudi 使用 Spark 作為其計算引擎無非是為了使用 Spark 的分布式計算能力以及 RDD 豐富的算子能力。抛開分布式計算能力外,Hudi 更多是把 RDD 作為一個資料結構抽象,而 RDD 本質上又是一個有界資料集,是以,把 RDD 換成 List,在理論上完全可行(當然,可能會犧牲些性能)。為了盡可能保證 Hudi Spark 版本的性能和穩定性。我們可以保留将有界資料集作為基本操作機關的設定,Hudi 主要操作 API 不變,将 RDD 抽取為一個泛型,Spark 引擎實作仍舊使用 RDD,其他引擎則根據實際情況使用 List 或者其他有界資料集。
解耦原則:
1)統一泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 統一使用泛型 I,K,O 代替;
2)去 Spark 化。抽象層所有 API 必須與 Spark 無關。涉及到具體操作難以在抽象層實作的,改寫為抽象方法,引入 Spark 子類實作。
例如:Hudi 内部多處使用到了 JavaSparkContext#map() 方法,去 Spark 化,則需要将 JavaSparkContext 隐藏,針對該問題我們引入了 HoodieEngineContext#map() 方法,該方法會屏蔽 map 的具體實作細節,進而在抽象成實作去 Spark 化。
3)抽象層盡量減少改動,保證 Hudi 原版功能和性能;
4)使用 HoodieEngineContext 抽象類替換 JavaSparkContext,提供運作環境上下文。
4.Flink 內建設計
Hudi 的寫操作在本質上是批處理,DeltaStreamer 的連續模式是通過循環進行批處理實作的。為使用統一 API,Hudi 內建 Flink 時選擇攢一批資料後再進行處理,最後統一進行送出(這裡 Flink 我們使用 List 來攢批資料)。
攢批操作最容易想到的是通過使用時間視窗來實作,然而,使用視窗,在某個視窗沒有資料流入時,将沒有輸出資料,Sink 端難以判斷同一批資料是否已經處理完。是以我們使用 Flink 的檢查點機制來攢批,每兩個 Barrier 之間的資料為一個批次,當某個子任務中沒有資料時,mock 結果資料湊數。這樣在 Sink 端,當每個子任務都有結果資料下發時即可認為一批資料已經處理完成,可以執行 commit。
DAG 如下:

- source 接收 Kafka 資料,轉換成 List;
- InstantGeneratorOperator 生成全局唯一的 instant.當上一個 instant 未完成或者目前批次無資料時,不建立新的 instant;
- KeyBy partitionPath 根據 partitionPath 分區,避免多個子任務寫同一個分區;
- WriteProcessOperator 執行寫操作,當目前分區無資料時,向下遊發送空的結果資料湊數;
- CommitSink 接收上遊任務的計算結果,當收到 parallelism 個結果時,認為上遊子任務全部執行完成,執行 commit.
注:InstantGeneratorOperator 和 WriteProcessOperator 均為自定義的 Flink 算子,InstantGeneratorOperator 會在其内部阻塞檢查上一個 instant 的狀态,保證全局隻有一個 inflight(或 requested)狀态的 instant.WriteProcessOperator 是實際執行寫操作的地方,其寫操作在 checkpoint 時觸發。
5. 實作示例
1) HoodieTable
/**
* Abstract implementation of a HoodieTable.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, I, K, O> index;
public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
......
}
HoodieTable 是 Hudi 的核心抽象之一,其中定義了表支援的 insert,upsert,bulkInsert 等操作。以 upsert 為例,輸入資料由原先的 JavaRDD inputRdds 換成了 I records, 運作時 JavaSparkContext jsc 換成了 HoodieEngineContext context.
從類注釋可以看到 T,I,K,O 分别代表了 Hudi 操作的負載資料類型、輸入資料類型、主鍵類型以及輸出資料類型。這些泛型将貫穿整個抽象層。
2) HoodieEngineContext
/**
* Base class contains the context information needed by the engine at runtime. It will be extended by different
* engine implementation if needed.
*/
public abstract class HoodieEngineContext {
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
......
}
HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不僅能提供所有 JavaSparkContext 能提供的資訊,還封裝了 map,flatMap,foreach 等諸多方法,隐藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等方法的具體實作。
以 map 方法為例,在 Spark 的實作類 HoodieSparkEngineContext 中,map 方法如下:
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
}
在操作 List 的引擎中其實作可以為(不同方法需注意線程安全問題,慎用 parallel()):
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(func::apply).collect(Collectors.toList());
}
注:map 函數中抛出的異常,可以通過包裝 SerializableFunction func 解決.
這裡簡要介紹下 SerializableFunction:
@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
O apply(I v1) throws Exception;
}
該方法實際上是 java.util.function.Function 的變種,與java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛異常。引入該函數是因為 JavaSparkContext#map() 函數能接收的入參必須可序列,同時在hudi的邏輯中,有多處需要抛異常,而在 Lambda 表達式中進行 try catch 代碼會略顯臃腫,不太優雅。
6.現狀和後續計劃
6.1 工作時間軸
2020 年 4 月,T3 出行(楊華@vinoyang,王祥虎@wangxianghu)和阿裡巴巴的同學(李少鋒@leesf)以及若幹其他小夥伴一起設計、敲定了該解耦方案;
2020 年 4 月,T3 出行(王祥虎@wangxianghu)在内部完成了編碼實作,并進行了初步驗證,得出方案可行的結論;
2020 年 7 月,T3 出行(王祥虎@wangxianghu)将該設計實作和基于新抽象實作的 Spark 版本推向社群(HUDI-1089);
2020 年 9 月 26 日,順豐科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公開 PR, 使其成為業界第一個線上上使用 Flink 将資料寫 Hudi 的企業。
2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标志着 Hudi-Spark 解耦完成。
6.2 後續計劃
1)推進 Hudi 和 Flink 內建
将 Flink 與 Hudi 的內建盡快推向社群,初期該特性可能隻支援 Kafka 資料源。
2)性能優化
為保證 Hudi-Spark 版本的穩定性和性能,此次解耦沒有太多考慮 Flink 版本可能存在的性能問題。
3)類 flink-connector-hudi 第三方包開發
将 Hudi-Flink 的綁定做成第三方包,使用者可以在 Flink 應用中以編碼方式讀取任意資料源,通過這個第三方包寫入 Hudi。
更多 Flink 技術交流可加入 Apache Flink 社群釘釘交流群: