作者 | 蔡适擇(順豐大資料平台負責人)
整理 | 趙陽(Flink 社群志願者)
本文主要介紹順豐在資料倉庫的資料實時化、資料庫 CDC、Hudi on Flink 上的實踐應用及産品化經驗。文章主要分為以下幾部分:
● 順豐業務介紹
● Hudi on Flink
● 産品化支援
● 後續計劃
1、順豐業務
1.1 順豐大資料的應用
先來看一下順豐大資料業務的全景圖。
大資料平台,中間的基礎部分是大資料平台,這塊是順豐結合開源元件自行搭建的。與之相關的是大資料分析與人工智能,順豐有一個非常強的地面部隊,就是線下的快遞小哥以及運輸車輛,需要使用 AI 以及大資料分析來輔助管理,提升整體效率。
區塊鍊,順豐對接了很多客戶與商家,對于商家來說,首先需要確定快件是可信的能夠做貨物的交易與交換。這塊涉及的基本上都是品牌商家,溯源與存證的業務順豐也有涉及。
IoT,就像之前提及到的,因為順豐地面部隊較多,相應需要采集的資料也會比較多。我們的部分包裹中是有傳感器的,車輛也有相關的傳感器,如車輛的攝像頭,以及快遞小哥的手環(包含地理位置、員工的健康狀态,對應做一些關懷的舉動)。同時,還有一些工作場景既有叉車,也有分揀裝置,這些就需要大資料平台來做一些關聯,是以 IoT 的應用相對較多。
智慧供應鍊和智慧物流,這兩塊更多的是指如何用大資料的手段輔助業務做一些經營上的決策。比如我們有很多 B 端客戶,對于他們來說如何在每個倉庫裡備貨,如何協調以及互相調撥,這部分就由智慧物流來完成。
下面這塊就是 IOT 實踐中的一部分:
從上面可以看出物流本身的環節是非常多的,下單、小哥收件、分揀、陸運中轉等整個過程,紅色解釋部分是指我們會做的一些 IoT 與大資料結合的應用,這裡其實大部分都是基于 Flink 來完成的。
1.2 順豐大資料技術矩陣
下面這張圖是順豐目前大資料整體的架構概覽:
1、資料內建層:最下面為資料內建層,因為順豐的曆史原因,是以包含了很多資料存儲引擎,如 Oracle、MySQL、MongoDB 等,并且部分引擎仍會繼續支援。右下物聯網裝置相對較新,主要是進行包含普通文本、網絡資料庫、圖像、音頻、視訊等的資料采集。
2、資料存儲計算:實時這塊順豐目前用的最多的還是 Flink,Storm 沒有标示出來,目前我們在做遷移。消息中間件處理目前主要使用 Kafka。然後右邊存儲結構的種類就相對豐富,因為不同的場景有不同的處理方式,比如資料分析需要性能比較強的 Clickhouse;數倉和離線計算這塊還是比較傳統,以 Hive 為主結合 Spark,目前我們是結合 Flink 與 Hudi 去實作離線實時化。
3、資料産品,我們傾向的還是首先降門檻,讓内部開發與使用者更容易上手。内部同學如果要掌握如此多的元件,成本是非常高的,再加上規範化會導緻溝通、維護以及運維的高額成本,是以我們一定要去做一些産品化、規範化的事情。
1.3 順豐科技資料采集組成
上圖就是我們大資料整體資料采集的概覽,資料采集目前包括微服務的應用,部分資料直發到 Kafka,還有些會落成日志,然後我們自己做了一個日志采集工具,類似于 Flume,更加的輕量化,達到不丢、不重、以及遠端的更新、限速。另外我們也會将 Kafka 中的資料通過 Flink 放到 HDFS,以 Hudi 的形式去做。下面會詳細介紹。
1.4 順豐資料應用架構
上圖是一個簡單的應用架構,剛才所說的大資料平台資料我們會按需推送到 OLAP 分析引擎、資料庫,這部分資料推送過去之後,到達資料服務平台。該資料服務平台主要是考慮到使用者或研發對接資料庫更便捷,以往在使用時,内部使用者首先需要了解大資料元件的使用,而現在通過我們的資料服務産品以配置化的方式配置查詢條件、聚合條件即可,最終把結果生成一個 restful 接口,業務系統可直接調用。比如研發使用者需要做搜尋,隻需要關注入參、出參,中間的過程不需要了解,這樣的話就能夠最大化的把技術門檻降下來,使用時也會更高效簡便。
中間部分我們是基于 Kong 做的網關,在 Kong 裡面可以加很多種通用的能力,包括監控、限流、緩存等都可以在裡面完成。
右邊的 Graphql,是 Facebook 開源的一個元件。前端使用者經常會出現需求的變更,背景接口需要相應地進行調整,這種情況就可以使用 Graphql 來支援。這裡其實是有兩個東西:apollo、graphql_Java,兩條線,apollo 适用于前端的研發使用者,用 node_js 來完成控制層的内容;graphql_Java 适用于後端的使用者,主要提供一些接口。
2、Hudi on Flink
2.1 Hudi 介紹
接下來我們主要介紹 Hudi on Flink 在順豐的應用實踐。Hudi 的核心優勢主要分為兩部分:
● 首先,Hudi 提供了一個在 Hadoop 中更新删除的解決方案,是以它的核心在于能夠增量更新,同時增量删除。增量更新的好處是國内與國際現在對隐私資料的保護要求比較高,比如在 Hive 中清理删除某一個使用者的資料是比較困難的,相當于重新清洗一遍資料。使用 Hudi 可以根據主鍵快速抓取,并将其删除掉。
● 另外,時間漫遊。之前我們有很多應用需要做準實時計算。如果要找出半個小時内的增量到底是什麼,變化點在哪,必須要把一天的資料全撈出來,過濾一遍才能找出來。Hudi 提供時間漫遊能力,隻需要類似 SQL 的文法就能快速地把全部增量撈出來,然後背景應用使用時,就能夠直接根據裡面的資料做業務的更新,這是 Hudi 時間漫遊裡最重要的能力。
Hudi 有兩種的寫的方法:
● copy on write。
◎ copy on write 這種形式更多是在每次寫的時候,能夠重寫曆史中關于更新記錄所在的檔案,把它重寫并且把增量部分再重新記錄下來,相當于把曆史狀态也給記錄下來。唯一的不足之處在于,寫的時候性能會稍微弱,但是讀的性能是很強的,和正常使用 Hive 沒有什麼差別。這個也是 Hudi 本身的優點。實時性略低,這部分取決于寫的檔案合并的頻率。不過批量的話,寫也不會影響到多少性能,是以本身也是批量的去寫。比如每隔幾分鐘寫一次,這個其實也不會産生很高的性能損耗,這就是 copy on write。
● merge on read
◎ merge on read 就是寫的時候實時會把 log 以 append 方式寫到 HDFS 中并寫成檔案,然後在讀的時候将已經生成的文本,再加上增量的部分合并,做一個 merge 操作。好處在于查詢的時候資料都是實時的,但是由于查詢任務确實較多,相當于是說每次查的時候,都要把兩部分資料取出來并做一個合并,是以也會造成損耗。
以上是 Hudi 情況的簡單介紹。
2.2 Hudi on Flink 組成部分 - 資料庫實時化
上圖是我們将資料實時化 CDC 的過程。資料庫的 CDC,基本上都是隻能到庫級别、庫粒度。前面的 source 支撐肯定也還是庫粒度,中間會經過兩個過程:
● 一部分是 DML,它會有過濾,當庫裡面有 100 張表時,很多時候有些表是不需要的,這部分我們會直接過濾掉,過濾就主要是通過産品化來打通它。
● 另一部分是 DDl,能夠實時更新 schema。比如庫表字段的增加或者變更,再或者可能加了個表或者改了一個表,這部分會在實時程式中打通資料直通車,隻要有任何變更,就會生成一個新的版本,然後将中繼資料資訊記錄到直通車裡,同時也會包裝到 binlog kafka sink 裡記錄,每一行會打上相應的版本号。這樣的話就對于後面的使用就能夠直接對應該條記錄,使用非常友善,不會有出錯的情況。
2.3 Hudi on Flink 組成部分 - 數倉實時化
這部分主要分享我們數倉實時化的過程,我們的目标是實作 Kafka 裡的資料在目前離線數倉中也能真正用起來,包括很多做準實時計算的使用者也能夠真正用起來。Hudi on Flink 就是我們嘗試的方案。以前 Hudi 這塊也做了 Hudi on Spark 方案,是官方推薦使用的方案,其實相當于多元護一個元件,但是我們大方向上還是希望所有實時的東西都能夠讓 Flink 去完成,另外也希望是 Flink 的應用生态能夠做得更加全面,在這部分就真正去把它落地下來,并且在生産中應用起來。
其實整個過程,比如做表資料實時化的時候,它是分為兩部份,一部分資料初始化,在啟動的時候,會把資料重新做批量的拉取,這個是用 Flink batch 來做的,其實社群本身也有提供這種能力。另外 Hudi 本身也具備把存量的 Hive 表 Hudi 化的能力,這是 Hudi 最新才出來的功能。這部分我們會用 Flink batch 的方式重新抽一遍,當然也有存量,對于存量的一些表,可以直接用存量表來轉化,然後用 Flink batch 做初始化。
另外一部分是增量更新,增量更新是指有個 DB connect 對接 Kafka,從 Kafka 的 source 拿到資料庫增量 CDC 的 binlog,然後把 binlog 進行加工,同時再利用 Flink 本身的 checkpoint 機制(Flink 本身的 checkpoint 整體頻率可以控制)進行 snapshot 的過程。其中所做的内容也我們自己可以控制的,是以采用 checkpoint 的形式可以把 Hudi 所需要做的 upsert 的操作全部在 checkpoint 中更新到線上,最終形成 Hudi 裡面的實時資料。
2.4 Hudi 數倉寬表方案
直接将 Kafka 資料扔到 Hudi 裡相對容易,真正困難的點在于寬表。對于整個 Hudi 來說,寬表是涉及到很多元表,當很多元表或者事實表更新的時候,會由多個事實表做一個關聯。但不是每個事實表都能抓到寬表的真正主鍵,是以 Hudi 沒法做這種更新。是以如何把寬表做資料實時化是一個難題。
上圖是順豐的寬表方案。
● 第一層,對于 ODS,可以直接連接配接 Kafka,用 Hudi on Flink 的架構就能夠完成。
● 第二層,DWD,這裡也有兩種辦法:
一種是用 Flink SQL 先把實時的 Kafka 寬表做完,不過這種辦法成本會高一點,相當于再次引入了 Kafka,整個資料鍊路變長,如果真正需要去用實時寬表可以小部分去推,但如果不存在純實時資料的需求,就沒有必要去做 DWD 的實時 Kafka 寬表。
另外,在沒有 DWD 的實時 Kafka 寬表的情況下,如何完成上述離線層的 DWD 實時化?這裡有幾個步驟,首先建立一個維表的 UDF 做表關聯,也是最友善的方式。其次,可以考慮直接用 join 的方式,用兩個實時表來做關聯,但可能存在關聯不到的情況。
當然,做維表關聯,就涉及到外鍵主鍵的映射。外鍵主鍵映射是為了讓我們能夠在另一個事實表更新時,快速找到主鍵在哪,即外鍵主鍵的映射 。另外主鍵索引,主鍵索引其實也是跟外鍵主鍵的映射相關。至于外鍵主鍵的映射,相當于把它建成一個新的表主鍵索引擷取,這樣增量更新 Hudi 跟原來的 ODS 層就基本上一緻了,這就是寬表實時加工的過程。下圖為運單的寬表舉例。
3、産品化支援
上述從技術層面分析了順豐當下業務架構的相關情況,以下将分享我們在産品化上所做的一些支援工作。
3.1 資料直通車
上圖是我們的資料直通車,能夠做到讓使用者自己在産品中操作,不需要寫代碼即可完成,可以實作低門檻的快速簡便的應用。比如配置資料接入僅需 1 分鐘左右,整個過程就是在産品上以配置化的手段就能夠将資料最終落在資料庫,我們的離線表、數倉、做資料分析都能夠直接快速的運用起來。
另外,資料接入進來之後,需要有資料管理的能力。上圖是資料管理能力測試環境的簡單情況,我們需要讓使用者能夠管理相關的資料,首先誰用它了,其次它涉及什麼字段,有哪些具體的内容,同時它裡面的血緣關系又是怎麼樣的,這個就是我們資料資産管理所具備的功能。
3.2 實時資料使用
上圖是我們 binlog 的 SDK,其實像 binlog 這種 avro 的格式,對使用者來說使用有一定門檻。但還是有一些編碼的使用者,對于這些使用者我們提供具體的 SDK,是以在 SDK 裡真正使用時都做到簡便。左邊看起來是 json,實際上是 avro 格式。右邊的内容就是在 Java 上的使用情況,這個是在代碼層面輔助研發快速應用的工具。
我們在平台上也做一些簡化的内容,首先有一部分是關于拖拽的,拖拽是指封裝一些元件,使用者可以通過拖拽來快速完成其需求。這個産品上線後,很多之前沒有任何實時計算的經驗,甚至連離線開發的經驗也沒有的使用者都能夠做實時的資料開發。
上圖為實時名額采集,産品上線之後有很多監控的需求,Flink 本身提供很多 Metric,使用者也有很多 Metric,我們希望為使用者提供一個高效的解決方案,把 Metric 全部采集出來,讓使用者能夠快速應用。
這裡在監控裡面也做了幾個工作,一個是爬蟲方案,實作一個 akka 的用戶端,Flink 本身是 akka 的架構,每個 jobmannager 都有 akka 的服務、接口,這樣隻要實作一個 akka 的用戶端,就能夠以 akka 的 API 形式擷取具體的 Metric 情況。這部分采集完之後發到 Kafka,最終存到 TDengine 再到 Grafana,提供給使用者。Grafana 也會整合到我們的實時計算平台産品裡面來,在面對存量的情況時,不需要重新開機使用者的任務,就能夠直接做資料采集。
但在面對增量情況時,就需要補充一些 Metric,比如 CPU 使用率、記憶體的使用率等。這部分我們以 Reporter 方案來滿足,Reporter 方案也是社群目前主推的方案。Reporte r 方案的原理其實是在 Flink 的 Metrics Reporter 裡進行插件開發,然後發到 gateway,這個 gateway 其實就是為了避免 Kafka 用戶端過多的問題,是以這裡中間做一個網關,後面還是和上面的一緻,這個就是 Flink 的任務監控情況。
4、後續計劃
上述已經分享了我們在内部已經落地、實際應用的過程,後續我們還會做什麼?
4.1 彈性計算
首先,彈性計算。目前像監控任務,使用者申請的資源遠遠超過實際需要使用的資源,會造成嚴重的資源浪費,記憶體也一樣。處理類似情況時,我們使用了 Flink 延伸的架構 Metrics monitor,結合采集的 Metrics,能夠做到當整個使用率過低或過高的時候,及時調整達到資源擴縮容或者并發擴容。
4.2 Flink 替換 Hive 演進
上面提到我們存量是有非常多的 Hive 任務,包括 Spark 任務需要進行替換,但怎麼去做呢?
首先我們用 Flink 來替換,由于強制或平台自動推薦都有難度,是以我們做了一些折中方案。比如埋點,當需要把資料寫到 Hive 的某個表,它會經過 Hiveserver,SQL 解析之後,此時将表進行替換,執行兩個路線:一個是正常的 table 這樣執行會寫到 Hive 裡面去。另外也會埋點把寫的表替換成另一個表,然後同時再以 Flink 的形式去執行一遍,不過會産生額外的資源消耗,執行大概生成兩個表,需要自動計算兩者是否一緻。如一緻測試穩定後就能以計算架構來去替換它。
大部分任務是相容的可替換的,但也有小部分不相容的情況,這部分可以采取人工處理,以盡量實作整個技術上的統一,這部分是後續需要完成的。
4.3 批流一體化
上圖是我們做批流一體化的過程,批流一體化在中繼資料管理與權限管理部分都已經有一些落地。
除此之外我們結合剛剛所說替換的過程,上圖就是 SQL 的相容測試。因為這幾者都做完之後,其實批流一體化可以同步去做,相當于同一個接口,加一個參數,即可實作流批處理底層引擎的快速切換,有助于整個資料開發能夠保持一緻,是以批流一體化也是後面需要嘗試的。
上圖實際上是我們一體化整個架構的最終形式。首先上面有一層 IDE 能夠讓所有的使用者使用。然後下面各種基礎功能支援,包括自動補全的 SQL 文法解析功能的支援,再往下就是一些資源管理、排程管理和知識管理,這些也是為了輔助開發而用的。再下面一層是計算引擎,要把這些計算引擎跟使用者做一個大的隔離,讓使用者不用再關注底層技術的實作和使用,這是我們後面的要持續去做的事情。