本文介紹了百信銀行實時計算平台的建設情況,實時資料湖建構在 Hudi 上的方案和實踐方法,以及實時計算平台內建 Hudi 和使用 Hudi 的方式。内容包括:
- 背景
- 百信銀行基于 Flink 的實時計算平台設計與實踐
- 百信銀行實時計算平台與實時資料湖的內建實踐
- 百信銀行實時資料湖的未來
- 總結
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
一、背景
百信銀行,全稱為 “中信百信銀行股份有限公司”,是首家獲批獨立法人形式的直銷銀行。作為首家國有控股的網際網路銀行,相比于傳統金融行業,百信銀行對資料靈活性有更高的要求。
資料靈活,不僅要求資料的準确性,還要求資料到達的實時性,和資料傳輸的安全性。為了滿足我行資料靈活性的需求,百信銀行大資料部承擔起了建設實時計算平台的職責,保證了資料快速,安全且标準得線上送達。
受益于大資料技術的發展和更新疊代,目前廣為人知的批流一體的兩大支柱分别是:“統一計算引擎” 與 “統一存儲引擎”。
- Flink,作為大資料實時計算領域的佼佼者,1.12 版本的釋出讓它進一步提升了統一計算引擎的能力;
- 同時随着資料湖技術 Hudi 的發展,統一存儲引擎也迎來了新一代技術變革。
在 Flink 和 Hudi 社群發展的基礎上,百信銀行建構了實時計算平台,同時将實時資料湖 Hudi 內建到實時計算平台之上。結合行内資料治理的思路,實作了資料實時線上、安全可靠、标準統一,且有靈活資料湖的目标。
二、百信銀行基于 Flink 的實時計算平台設計與實踐
1. 實時計算平台的定位
實時計算平台作為行級實時計算平台,由大資料 IaaS 團隊自主研發,是一款實作了實時資料 ”端到端“ 的線上資料加工處理的企業級産品。
- 其核心功能具備了實時采集、實時計算、實時入庫、複雜時間處理、規則引擎、可視化管理、一鍵配置、自主上線,和實時監控預警等。
- 目前其支援的場景有實時數倉、斷點召回、智能風控、統一資産視圖、反欺詐,和實時特征變量加工等。
- 并且,它服務着行内小微、信貸、反欺詐、消金、财務,和風險等衆多業務線。
截止目前,線上穩定運作的有 320+ 的實時任務,且線上運作的任務 QPS 日均達到 170W 左右。
2. 實時計算平台的架構
按照功能來劃分的話,實時計算平台的架構主要分為三層:
■ 1)資料采集層
采集層目前主要分為兩個場景:
-
第一個場景是采集 MySQL 備庫的 Binlog 日志到 Kafka 中。我行所使用的資料采集方案并沒有采用業界普遍用的如 Canal,Debezium 等現有的 CDC 方案。
1、因為我們的 MySQL 版本為百信銀行内部的版本,Binlog 協定有所不同,是以現有的技術方案不能很好的支援相容我們擷取 Binlog 日志。
2、同時,為了解決我們資料源 MySQL 的備庫随時可能因為多機房切換,而造成采集資料丢失的情況。我們自研了讀取 MySQL Binlog 的 Databus 項目,我們也将 Databus 邏輯轉化成了 Flink 應用程式,并将其部署到了 Yarn 資源架構中,使 Databus 資料抽取可以做到高可用,且資源可控。
-
第二個場景是,我們對接了第三方的應用,這個第三方應用會将資料寫入 Kafka,而寫入 Kafka 有兩種方式:
1、一種方式是依據我們定義的 Json shcema 協定。
(UMF協定:{col_name:””,umf_id":"","umf_ts":,"umf_op_":"i/u/d"})
協定定義了 ”唯一 id”,”時間戳“ 和 ”操作類型“。根據此協定,使用者可以指定對該消息的操作類型,分别是 "insert","update" 和 "delete",以便下遊對消息進行針對性處理。
2、另外一種方式,使用者直接把 JSON 類型的資料寫到 kafka 中,不區分操作類型。
■ 2)資料計算轉換層
消費 Kafka 資料進行一層轉換邏輯,支援使用者自定義函數,将資料标準化,做敏感資料的脫敏加密等。
■ 3)資料存儲層
資料存儲到 HDFS,Kudu,TiDB,Kafka,Hudi,MySQL 等儲存媒體中。

在上圖所示的架構圖中,我們可以看到整體實時計算平台支援的主要功能有:
-
開發層面:
1、支援标準化的 DataBus 采集功能,該功能對于支援 MySQL Binglog 同步到 Kafka 做了同步适配,不需要使用者幹預配置過多。使用者隻需要指定資料源 MySQL 的執行個體就可以完成到 Kafka 的标準化同步。
2、支援使用者可視化編輯 FlinkSQL。
3、支援使用者自定義 Flink UDF 函數。
4、支援複雜事件處理(CEP)。
5、支援使用者上傳打包編譯好 Flink 應用程式。
-
運維層面:
1、支援不同類型任務的狀态管理,支援savepoint。
2、支援端到端的延遲監控,告警。
在實時計算平台更新疊代的過程中,社群 Flink 版本之間存在一些向下不相容的情況。為了平滑的更新 Flink 版本,我們對計算引擎的多版本子產品進行統一的抽象,将多版本之間做了嚴格的 JVM 級别隔離,使版本之間不會産生 Jar 包沖突,Flink Api 不相容的情況。
如上圖所示,我們将不同的 Flink 版本封裝到一個獨立的虛拟機中,使用 Thrift Server 啟動一個獨立的 JVM 虛拟機,每個版本的 Flink 都會有一個獨立的 Thrift Server。在使用的過程中,隻要使用者顯示指定的 Flink 版本,Flink 應用程式就會被指定的 Thrift Server 啟動。同時,我們也将實時計算的後端服務嵌入一個常用的 Flink 版本,避免因為啟動 Thrift Server 而占用過多的啟動時間。
同時為了滿足金融系統高可用和多備的需求,實時計算平台也開發了多 Hadoop 叢集的支援,支援實時計算任務在失敗後可以遷移到備叢集上去。整體的方案是,支援多叢集 checkpoint,savepoint,支援任務失敗後,可以在備機房重新開機實時任務。
三、百信銀行實時計算平台與實時資料湖內建實踐
在介紹本内容之前,我們先來了解一些我行目前在資料湖的現狀。目前的實時資料湖,我行依然采用主流的 Lambda 架構來建構資料倉庫。
1. Lambda
Lambda 架構下,數倉的缺點:
- 同樣的需求,開發和維護兩套代碼邏輯:批和流兩套邏輯代碼都需要開發和維護,并且需要維護合并的邏輯,且需同時上線;
- 計算和存儲資源占用多:同樣的計算邏輯計算兩次,整體資源占用會增多;
- 資料具有二義性:兩套計算邏輯,實時資料和批量資料經常對不上,準确性難以分辨;
- 重用 Kafka 消息隊列:Kafka 保留往往按照天或者月保留,不能全量保留資料,無法使用現有的 adhoc 查詢引擎分析。
2. Hudi
為了解決 Lambda 架構的痛點,我行準備了新一代的資料湖技術架構,同時我們也花大量的時間調研了現有的資料湖技術,最終選擇 Hudi 作為我們的存儲引擎。
- Update / Delete 記錄:Hudi 使用細粒度的檔案/記錄級别索引,來支援 Update / Delete 記錄,同時還提供寫操作的事務保證,支援 ACID 語義。查詢會處理最後一個送出的快照,并基于此輸出結果;
- 變更流:Hudi 對擷取資料變更提供了流的支援,可以從給定的時間點擷取給定表中已 updated / inserted / deleted 的所有記錄的增量流,可以查詢不同時間的狀态資料;
- 技術棧統一:可以相容我們現有的 adhoc 查詢引擎 presto,spark。
- 社群更新疊代速度快:已經支援 Flink 兩種不同方式的的讀寫操作,如 COW 和 MOR。
在新的架構中可以看到,我們将實時和批處理貼源層的資料全部寫到 Hudi 存儲中,并重新寫入到新的資料湖層 datalake(Hive 的資料庫)。出于曆史的原因,為了相容之前的資料倉庫的模型,我們依然保留之前的 ODS 層,曆史的數倉模型保持不變,隻不過 ODS 貼源層的資料需要從 datalake 層擷取。
-
首先,我們可以看到,對于新的表的入倉邏輯,我們通過實時計算平台使用 Flink 寫入到 datalake 中(新的貼源層,Hudi 格式存儲),資料分析師和資料科學家,可以直接使用 datalake 層的資料進行資料分析和機器學習模組化。如果資料倉庫的模型需要使用 datalake 的資料源,需要一層轉換 ODS 的邏輯,這裡的轉換邏輯分為兩種情況:
1、第一種,對于增量模型,使用者隻需要将最新 datalake 的分區使用快照查詢放到 ODS 中即可。
2、第二種,對于全量模型,使用者需要把 ODS 前一天的快照和 datalake 最新的快照查詢的結果進行一次合并,形成最新的快照再放到 ODS 目前的分區中,以此類推。
我們這麼做的原因是,對于現有的數倉模型不用改造,隻是把 ODS 的資料來源換成 datalake,時效性強。同時滿足了資料分析和資料科學家準實時擷取資料的訴求。
-
另外,對于原始的 ODS 存在的資料,我們開發了将 ODS 層的資料進行了一次初始化入 datalake 的腳本。
1、如果 ODS 層資料每天是全量的快照,我們隻将最新的一次快照資料初始化到 datalake 的相同分區,然後實時入 datalake 的鍊路接入;
2、如果 ODS 層的資料是增量的,我們暫時不做初始化,隻在 datalake 中重建立一個實時入湖的鍊路,然後每天做一次增量日切到 ODS 中。
- 最後,如果是一次性入湖的資料,我們使用批量入湖的工具導入到 datalake 中即可。
整體湖倉轉換的邏輯如圖:
3. 技術挑戰
-
在我們調研的初期,Hudi 對 Flink 的支援不是很成熟,我們對 Spark - StrunctStreaming 做了大量的開發和測試。從我們 PoC 測試結果上看,
1、如果使用無分區的 COW 寫入的方式,在千萬級寫入量的時候會發現寫入越來越慢;
2、後來我們将無分區的改為增量分區的方式寫入,速度提升了很多。
之是以會産生這個問題,是因為 spark 在寫入時會讀取 basefile 檔案索引,檔案越大越多,讀取檔案索引就會越慢,是以會産生寫入越來越慢的情況。
-
同時,随着 Flink 對 hudi 支援越來越好,我們的目标是打算将 Hudi 入湖的功能內建到實時計算平台。是以,我們把實時計算平台對 Hudi 做了內建和測試,期間也遇到一些問題,典型的問題有:
1、類沖突
2、不能找到 class 檔案
3、rocksdb 沖突
為了解決這些不相容的問題,我們将對 Hudi 的依賴,重新構造了一個獨立的子產品,這個工程隻是把 Hudi 的依賴打包成一個 shade package。
4、當有依賴沖突時,我們會把 Flink 子產品相關或者 Hudi 子產品相關的沖突依賴 exclude 掉。
5、而如果有其他依賴包找不到的情況,我們會把相關的依賴通過 pom 檔案引入進來。
- 在使用 Hudi on Flink 的方案中,也遇到了相關的問題,比如,checkpoint 太大導緻 checkpoint 時間過長而引起的失敗。這個問題,我們設定狀态的 TTL 時間,把全量 checkpoint 改為增量 checkpoint,且提高并行度來解決。
-
COW 和 MOR 的選擇。目前我們使用的 Hudi 表以 COW 居多,之是以選擇 COW,
1、第一是因為我們目前曆史存量 ODS 的資料都是一次性導入到 datalake 資料表中,不存在寫放大的情況。
2、另外一個原因是,COW 的工作流比較簡單,不會涉及到 compaction 這樣的額外操作。
如果是新增的 datalake 資料,并且存在大量的 update,并且實時性要求較高的情況下,我們更多的選擇 MOR 格式來寫,尤其寫 QPS 比較大的情況下,我們會采用異步 compaction 的操作,避免寫放大。除了這種情況外,我們還是會更傾向以 COW 的格式來寫。
四、百信銀行實時資料湖的未來
在我行實時資料湖的架構中,我們的目标是将實時數倉的整個鍊路建構在Hudi之上,架構體系如圖:
我們整體的目标規劃是替代 kafka,把 Hudi 作為中間存儲,将數倉建設在 Hudi 之上,并以 Flink 作為流批一體計算引擎。這樣做的好處有:
- MQ 不再擔任實時資料倉庫存儲的中間存儲媒體,而 Hudi 存儲在 HDFS 上,可以存儲海量資料集;
- 實時資料倉庫中間層可以使用 OLAP 分析引擎查詢中間結果資料;
- 真正意義上的批流一體,資料 T+1 延遲的問題得到解決;
- 讀時 Schema 不再需要嚴格定義 Schema 類型,支援 schema evolution;
- 支援主鍵索引,資料查詢效率數倍增加,并且支援 ACID 語義,保證資料不重複不丢失;
- Hudi 具有 Timeline 的功能,可以更多存儲資料中間的狀态資料,資料完備性更強。
五、總結
本文介紹了百信銀行實時計算平台的建設情況,實時資料湖建構在 Hudi 上的方案和實踐方法,以及實時計算平台內建 Hudi 和使用 Hudi 的方式。
在使用 Hudi 的過程中,也遇到一些問題,由衷感謝社群同學的幫助。特别感謝社群 Danny chan,leesf 解疑答惑。在實時資料湖架構體系下,建構我們實時數倉,流批一體方案還是在摸索中。
僅以此篇,希望能給其他正在建設實時計算平台,和使用 Hudi 建構實時資料湖的同學提供一些參考。我們也誠懇邀請對實時計算平台和實時資料湖有濃厚興趣的同學加入我們,投遞履歷的方式如下。
更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc