天天看點

Flink 流批一體在 Shopee 的大規模實踐

作者:散文随風想

摘要:本文整理自 Shopee 研發專家李明昆,在 Flink Forward Asia 2022 流批一體專場的分享。本篇内容主要分為四個部分:

1. 流批一體在 Shopee 的應用場景

2. 批處理能力的生産優化

3. 與離線生态的完全內建

4. 平台在流批一體上的建設和演進

01

流批一體在 Shopee 的應用場景

Flink 流批一體在 Shopee 的大規模實踐

首先,先來了解一下 Flink 在 Shopee 的使用情況。

除了流任務,僅從支援的批任務來看,Flink 平台上的作業已經到達了一個比較大的規模。

目前 Flink 批任務已經在 Shopee 内部超過 60 個 Project 上使用,作業數量也超過了 1000,這些作業在排程系統的支援下,每天會生成超過 5000 個執行個體來支援各個業務線。

Flink 流批一體在 Shopee 的大規模實踐

從應用場景劃分,這些作業在 Shopee 主要分為以下四個部分:

  • 第一個應用場景是資料內建領域。
  • 第二個應用場景是數倉領域。
  • 第三個應用場景是特征工程,主要用于實時和離線特征的生成。
  • 第四個應用場景是風控反作弊領域,用做實時反作弊和離線反作弊。

從 Shopee 内部的業務場景來看,數倉是一個流批一體發揮重要作用的領域。

目前,業内還沒有這樣一個端到端流式數倉的成熟解決方案,大部分都是通過一些純流的方案 + 離線數倉方案 + 互動式查詢方案疊加起來達到近似效果。

在這類 Lambda 架構中,Flink 流批一體主要帶來的優勢是實作計算統一。通過計算統一去降低使用者的開發及維護成本,解決兩套系統中計算邏輯和資料口徑不一緻的問題。

但這樣的 Lambda 架構複雜性又太高了。是以針對時延要求不高的業務,Shopee 實時團隊主推通過 Flink+ Hudi 的替代方案,建構近實時數倉,這種方案可以解決很多場景的問題。

這種方案的好處很明顯,它實作了部分的流批一體:Flink 統一的引擎,Hudi 提供統一的存儲。它的限制也很明顯,Hudi 資料可見性與 Commit 的間隔相關,進而與 Flink 做 Checkpoint 的時間間隔相關,這延長了整個資料鍊路的時延。

目前這種 Flink+Hudi 的方案已經在 Shopee 内部很多業務線上進行使用。比如廣告業務的 Deep ads.和 offline-platform ads.用于給廣告主和産品營運産出廣告資料。又比如 Shopee 核心業務 supply chain 的 WMS。WMS 的資料服務整體使用的是 lambda 架構。但對于核心業務使用該方案生成的近實時資料,用于與離線資料做 diff,監控實時鍊路提供的資料品質。

Flink 流批一體在 Shopee 的大規模實踐

上圖 PPT 展示了 Datamart 的一個例子。Datamart 使用 Hudi partital update 完成 DIM 表的 Join 更新,降低資源使用量。

之前,他們每小時對最近 3 小時的資料進行計算和重新整理,在保證資料及時更新的情況下,解決資料延遲、Join 時間不對齊等問題。但随着資料量的迅速增長,小時級資料 SLA 的保障難度和計算資源的消耗都在不斷增加。

現在,使用者一方面通過 Flink 加速計算,另一方面通過與批處理結合來確定資料的最終一緻性。并通過提供分級的結果表來滿足不同場景的及時性要求,實時計算産出的 Partial Update Hudi 表提供部分核心實時資料,批處理産出的 Multi-version Hudi 表提供完整且更準确的資料。

最終,在確定資料一緻性的基礎上,達到了分鐘級延遲,并有效降低了計算資源的消耗。

Flink 流批一體在 Shopee 的大規模實踐

除了業務線使用之外,目前 Shopee 内部提供的一些平台服務也在使用 Flink。

第一個例子是 Data Infra 團隊提供的 Data Hub, 它提供了一些離線內建和實時內建的常用子產品。之前他們必須引入不同的引擎來支援不同的內建子產品,導緻項目依賴複雜,使用者也需要了解多套引擎。

使用 Flink 後,在之後新的需求中,Data Hub 不再需要引入不同的引擎來解決批和流兩套資料的內建。

Flink 流批一體在 Shopee 的大規模實踐

第二個例子是 Feature Station,Feature Station 是 Shopee 内部提供的一個 特征生成的平台。它提供了一些降低使用者運維成本的功能,比如 Feature 生成 SQL 化,支援多業務線并行開發等等。

之前這個平台的任務依賴 Spark,後來從 Spark 全部遷移到了 Flink。Flink 帶來的一個很大的優勢是便于擴充。如果之後使用者有實時特征需求,使用者可以将離線特征的生成邏輯非常快速的複用到實時特征上。

上面介紹的都是 Shopee 内部流批一體應用場景的一些例子,我們内部還有很多團隊也正在嘗試 Flink 的流批一體,未來會使用的更廣泛。

02

批處理能力的生産優化

Flink 在流處理方面一直有着天然的優勢,相對而言,批的能力較弱一些。我們都能看到,社群最近幾個版本中,一直在大力推進 Flink 批處理的能力。而對批支援的好壞也一直是使用者選擇 Flink 流批一體的一個重要影響因素。下面将基于内部的實踐,我将介紹一下 Shopee 對 Flink 批在生産上的一些優化,主要分為穩定性和易用性兩個方向。

2.1 穩定性

Flink 流批一體在 Shopee 的大規模實踐

批作業一般都是通過排程系統周期性排程的。使用者一般會管理大量的批作業,是以在生産實踐中,他們非常關注作業的穩定性。

Flink Batch 在使用過程中,我們主要遇到了以下的問題:

  • 當大作業執行時間長時,任務越容易遇到各種問題,失敗次數會顯著增加。
  • Task 失敗後 failover 的成本過高,作業的整體耗時會被嚴重拉長。

這樣就形成了惡性循環,執行時間越長越容易失敗,而失敗又反向拉長了執行時間。這裡的根本的問題是 Shuffle。

Flink 流批一體在 Shopee 的大規模實踐

Flink 目前提供了兩種 Shuffle,Hash Shuffle 和 Sort Shuffle,但這兩種 Shuffle 的不同主要是表現在 Shuffle 資料的結構上,從 Shuffle 的整體架構上看,兩者都是 Internal Shuffle。Internal Shuffle 就是 Shuffle 服務與 Task 共享程序,TaskManager 在 Task 執行完成之後還要繼續保留去做 Shuffle 服務。

Internal Shuffle 的問題主要有兩個:

第一個是:Shuffle 服務的穩定性會被有問題的 task 所影響。

  • 這個有問題 task 可能來自 job 本身,也可能是同機器的其他 job。在 Shopee 内部,Spark 與 Flink Batch 跑在相同的離線叢集,是以也會受到其他類型離線任務的影響。同樣,yarn 的穩定性也會影響 Flink batch 任務。
  • 按照現在的 failover 邏輯,TaskManager 無論是由于内部原因還是外部原因導緻崩潰,Task 都會重跑,Shuffle 資料也都會丢失。盡管可能隻是部分 Task 重跑,但因為我們目前使用的 1.15 沒有推測執行,是以也會導緻 Job 整體執行時間嚴重拉長。
Flink 流批一體在 Shopee 的大規模實踐

第二個是:當 Task 完成之後,由于 TaskManager 不能立刻被釋放,還要提供 Shuffle 服務,這就導緻 Yarn 必須維護 Task 執行完的 container,造成叢集資源使用率不高。

Flink 流批一體在 Shopee 的大規模實踐

針對 Internal Shuffle 的問題,其實業界也已經有了成熟的方案,那就是 Remote Shuffle。

上圖中展示了兩張架構圖,一個是 Internal Shuffle,另一個是 Remote Shuffle,其實還有一個 external Shuffle,External Shuffle 就是把 Shuffle 服務拆分到另一個程序中。Spark 使用的 yarn auxilary external Shuffle,就是把 Shuffle 服務挪到了 Node Manager 裡面,但是這還是存儲跟計算混合在一起的架構。

是以我們選擇一步到位,使用 Remote Shuffle。就是轉門搭建一套 Shuffle 叢集來提供 Shuffle 服務。

這種存儲與計算分離的架構有以下幾個好處:

  • 計算和存儲再也不會互相影響。Shuffle 服務與使用者的代碼完全隔離。
  • 将 Shuffle 的工作轉移到 Remote Shuffle 叢集後,Task 執行完畢時,Task Manager 的資源可以立刻被釋放。
  • 在這種架構下,計算跟資源解耦 了,我們可以自由的擴充或者收縮各自的資源量。
Flink 流批一體在 Shopee 的大規模實踐

業界有不少 Remote Shuffle 的方案,比如阿裡雲的 Celeborn,位元組的 Cloud ShuffleService,另外還有 Uber Remote Shuffle Service,Splash 等等。但是這些 Remote Shuffle 大部分主要是為了支援 Spark,支援 Flink 的并不多,另外有一些隻在内部版本中支援 Flink。

最後在選型的标準裡面,我們主要考慮了項目本身的成熟度,社群對 Flink 的支援度,與 Flink 的比對程度,最終還是采用了 Flink Remote Shuffle。

這個方案有幾個好處:

  • Flink Remote Shuffle 是 Flink 的一個擴充項目,原生就是為了支援 Flink,社群的支援力度大,之後有了問題可以跟社群多交流。
  • 目前 Flink 的 Batch 正在快速發展,每個版本都有很大的變動和提高,比如 1.16 的 hybrid Shuffle 和推測執行,其他的 Remote Shuffle 不可能這麼快速跟進。

雖然 Flink Remote Shuffle 也有缺點,但暫時可以忍受。另外 Remote Shuffle 其實是跟計算引擎分離的,等之後 Flink Batch 的特性穩定了,我們最終希望是離線能共用一套 Remote Shuffle service。

Flink 流批一體在 Shopee 的大規模實踐

在叢集部署方案上,我們采取了跟 Presto 混部的方案。主要的考慮是為了充分的利用資源,Presto 和 Remote Shuffle 在資源使用上剛好互補。Remote Shuffle 本身是一個存儲服務,它不怎麼使用 CPU 和 memory,但會占用大量的磁盤。相反,Presto 會占用大量的 CPU 和 memory,磁盤使用量相對較少。

另外,從時間上看,Batch 任務更多的集中在晚上,互動式查詢更多集中在白天,這也有利于資源複用。再就是為了避免互相影響,我們使用 Ggroup 來為兩個服務提供資源限制和隔離。

Flink 流批一體在 Shopee 的大規模實踐

最後,我們搭建了一個有 145 個節點的 Shuffle 叢集,為線上的 Batch 任務提供 Shuffle 服務。其中每個節點使用一個 3TB 的 SSD 來儲存資料,有效保證 Shuffle 資料的存取性能。

Flink 流批一體在 Shopee 的大規模實踐

在叢集搭建好之後,我們也在 Remote Shuffle Service 上做了一些測試和生産驗證。從上圖就可以看到效果。

從性能上看,相比 Hash Shuffle , Remote Shuffle 的性能提升了 19.3%, 相比 Sort Shuffle, 性能提升了 6.1%。

從穩定性上看,我們取了一個之前非常不穩定的 project。結果是 Task 失敗率降低了超過 70%。

是以無論從性能還是穩定性,Remote Shuffle 都能帶來很好的收益。

Flink 流批一體在 Shopee 的大規模實踐

當然,Remote Shuffle 這個項目也還有一些問題。

  • 網絡環境的異常波動會導緻 Shuffle 服務不穩定,表現出來主要是 ShuffleClient 與 ShuffleWorker 之間連接配接中斷。這反映了一個問題,就是資料重傳機制的缺失。
  • 另外就是沒有多租戶資源隔離機制,無論是帶寬還是磁盤資源,目前都沒有隔離機制,這會導緻不同 Job 之間互相影響。

當然,這些問題也都在不斷改進中,總的來看,Flink Remote Shuffle 對 Flink Batch 有很大的幫助。

2.2 易用性

Flink 流批一體在 Shopee 的大規模實踐

除了上面針對 Shuffle 的優化之外,Shopee 也在易用性方面做了很多工作。大家都知道,對于流批一體,Flink SQL 為核心載體。在使用過程中,SQL 也存在一系列使用上的困難。

第一個問題是 SQL 任務有問題後,對于使用者而言定位困難。之前我們的流任務主要依賴 web UI,沒有 History Server。有了 History Server 之後,定位 Task 的問題得到了緩解。但是還有一個比較麻煩的事情,就是 SQL 任務經過 Planner 優化之後,執行計劃與 SQL 結構上有了較大差異,使用者使用過程中,經常很難根據 Task 資訊定位到相關的 SQL 語句。

第二個問題是 SQL 配置困難。SQL 任務各 Task 之間資源使用經常不均衡,有的是 CPU 密集型,有的是記憶體密集型,很難通過統一的 TM 配置來解決。社群 SQL API 也并沒有提供細粒度資源配置的接口。導緻一些進階使用者希望優化資源使用量時,SQL 任務的資源配置十分困難。

Flink 流批一體在 Shopee 的大規模實踐

針對 SQL 問題分析定位的難點,我們做了兩點優化:

  • 在使用者送出 SQL 任務之前,展示作業的 streamgraph,讓使用者執行之前就能看到 SQL 的執行邏輯,以判斷是否符合自己的預期。
  • 第二個優化就像上圖中展示的一樣,将執行節點轉換成對應 SQL,讓使用者知道每個 Task 的對應的 SQL 段,幫助定位問題位置。
Flink 流批一體在 Shopee 的大規模實踐

另外,一些算子為了在不同的資料下有更好的性能,同一個算子會有多種實作方案,比如 join。一些使用者在排查問題時,會關心優化器對 SQL operator 的具體實作邏輯。是以,除了展示每個 Task 的對應的 SQL 之外,我們還提供展示 SQL 算子對應生成的 Java code,以确定算子底層實作邏輯,輔助排查 SQL 故障。

Flink 流批一體在 Shopee 的大規模實踐

針對第二個 SQL 任務資源優化的問題,我們在展示 streamgraph 的基礎上,允許為不同的 operator 配置不同的并發度,連結政策還有 slot group 等等。

Flink 流批一體在 Shopee 的大規模實踐

在資源配置上,我們并沒有使用社群提供的 operator 級别的細粒度資源配置。主要有兩個原因:

  • Slot 資源使用量使用者很難監控,目前最多監控到 TM 粒度。這導緻使用者沒有監控依據,無法準确預估每個 slot 的資源使用量。
  • 動态資源切割機制導緻機器上出現大量碎片。

我們最後使用了自己開發的 SlotGroup 級别的資源配置,整體思路是不同的 SlotGroup 申請不同規格的 TM,Slot 依然是均分 TaskManager 的資源,但可以通過為不同的 Operator 設定不同的 SlotGroup,進而設定不同的資源量。

這種方案讓使用者可以很友善的依據 TaskManager 使用監控,定位到配置不合理的 SlotGroup 和 Operator, 進而調整 TM 資源配置,優化作業的整體資源使用率。

上圖中的功能依賴于我們内部開發的“SlotGroup 粒度的資源排程”。

Flink 流批一體在 Shopee 的大規模實踐

當然除了以上對 Batch 的優化之外,我們還進行很多其他的優化。比如複用 stream 模式下 compact 小檔案的邏輯;調整容錯機制, 支援 Batch SQL 的小檔案 compact

還有就是 parquet 的 nested projection/filter pushdown;優化超過 64 位 GroupId 生成政策;優化 FileSourceCoodinator 建立邏輯等等。

這些優化都有效解決了生産過程中 Shopee 各個業務線遇的問題。

03

與離線生态的完全內建

在流批一體落地的過程中,使用者最關心的就是技術架構的改動成本和潛在風險。作為 Flink 平台,面臨的一個很重要的挑戰就是如何相容好使用者已經廣泛應用的離線批處理能力。是以第三部分主要介紹與離線生态的內建,主要涉及開發和執行兩個層面的問題。

3.1 開發層面

Flink 流批一體在 Shopee 的大規模實踐

開發層面主要是複用的問題,複用的目的是為了降低使用者的使用成本。由于很多使用者已經在其他引擎上積累的大量的業務 UDF,是以我們提供的統一 UDF 來解決 UDF 複用的問題。

Flink 流批一體在 Shopee 的大規模實踐

統一 UDF 的目标是為了使用者能在 Flink 平台上無縫通路各種 UDF。目前我們已經支援了很多類型的 UDF。

  • Flink 本身的 UDF,我們将很多 Flink build-in function 下放支援低版本。
  • 增加了一些 Shopee 内部常用的 UDF,使用者也可以上傳共享自定義的 UDF。
  • 針對其他引擎的 UDF,我們依賴 load module 支援了的 Hive UDF。對于 Spark build in 的 UDF,為了降低使用者使用成本,我們也把大量常用的 Spark UDF 遷移到了 Flink。
  • 值得一提的是,我們團隊目前已經支援了 SQL 語句中加入 Java 代碼并解析成 UDF。上圖中有個例子,之後我們還會支援 lambda 表達式等等,這将大大友善使用者對 UDF 的使用。
Flink 流批一體在 Shopee 的大規模實踐

除了複用 UDF 以外,我們還通過統一進制資料來複用已經存在的離線資料模型。與其他各自已有的中繼資料管理一起,加上依賴 HDP scheme registry 建構的實時中繼資料,一起建構形成 Unity Catalog。

使用者可以隻通過 Unity Catalog 來通路底層不同的資料,在平台提供的 SQL IDE 中,可以十分友善的通路已有的 Catalog 和資料表。目前已經支援了 Kafka,Hive,Hudi, Redis, Hbase 這幾個不同的資料類型。

3.2 執行層面

Flink 流批一體在 Shopee 的大規模實踐

在執行層面,随着 Flink 能力的增強,使用者希望 Flink SQL 批任務嵌入到目前的資料加工過程中,作為中間的一個環節。是以我們将 Flink Batch 接入了 Shopee 内部的統一排程平台 Data Scheduler。并且通過統一的資料 marker 服務來進行資料依賴。最終将 FlinkBatch 與已有的其他資料處理引擎打通,更好的服務使用者。

Flink 流批一體在 Shopee 的大規模實踐

另外,在離線領域,清晰的血緣是對資料進行追溯和影響分析的基礎。當資料有了清晰的血緣和歸屬,系統中的資料就有了清晰的結構。

我們團隊目前除了通過上一張 PPT 提到的資料 marker 來提供資料依賴關系之外,

還從 gragh 中抽取 Source,Sink,Lookup 的中繼資料資訊,報告給 Datamap,以生成更完整的資料血緣。

當然除了離線資料的中繼資料之外, 我們也正在設計将實時資料的中繼資料整合到現有的資料血緣中,徹底将所有資料的歸屬打通。

04

平台在流批一體上的建設和演進

最後我想介紹一下我們 Flink 平台在流批一體上的建設和演進。其實在上面介紹中,已經展示了不少平台的功能。是以這一部分,我隻會重點介紹一下平台對運維工具 History Server 的優化。

其實 Flink 流任務對 History Server 的需求并不大,因為流任務理論上一直在運作,我們可以用 web UI。但是對于批任務,History Server 卻是一個非常有效的運維追溯工具。

4.1 HistoryServer 接入 Yarn 日志

Flink 流批一體在 Shopee 的大規模實踐

首先我要宣傳一下 1.16 的新特性:跳轉外部 log。

雖然我們平台已經将使用者的日志接入的 kibana,但是因為日志是混合的,是以查詢的時候使用者要先定位到 subTask,然後需要輸入各種篩選條件查詢,查詢流程比較長,速度也比較慢。是以我們一直想優化這個流程,在最近釋出的 1.16 中,支援了接入外部 log 的功能,我們針對日志較少的 Batch 任務,直接使用該特性跳轉到 yarn 的 history log,十分友善檢視問題 Task 的全量日志。

4.2 HistoryServer 小檔案問題

Flink 流批一體在 Shopee 的大規模實踐

另外,History Server 還有一個小檔案的問題。從上圖左側可以看到,History Server 将曆史任務存儲為大量 Json 小檔案用于服務 Web UI。當隻支援流任務的時候這個問題并不明顯,但是随着我們平台支援批任務後,曆史任務的數量劇增。

數量的上漲帶來的幾個問題:

  • 大拓撲,大并發的任務的解壓對 History Server 服務産生壓力。
  • 曆史任務産生的大量檔案對部署節點的檔案系統産生大量存儲開銷。大量小檔案導緻單個 History Server 隻能儲存很短時間的曆史任務。不然就會将單機的 inode 耗光。
  • History Server 目前重新開機後需要重新拉取曆史 Job 資訊并解壓。

這些都給生産帶來了問題。

4.3 HistoryServer 優化方案

Flink 流批一體在 Shopee 的大規模實踐

是以我們對 History Server 整體架構做了優化,整體的思路是,隻對需要的曆史 Job 進行解壓。

第一,是拆分拉取和解壓兩個功能,将原有的 Fetcher Executor 拆分成了 Fetcher Executor 和 Unzip Executor。Unzip Executor 專門處理 archivedJobFile 解壓。

第二,增加 archivedJobs 目錄存儲壓縮後的曆史任務檔案,從遠端拉取的曆史任務不立刻進行解壓。而是當使用者通路時增加一個解壓任務進行解壓。

這樣就減少了 History Server 的工作量,降低了 History Server 的負載,也降低了部署節點的存儲開銷。這個方案在我們線上使用後,将存儲開銷降低了 90%以上,效果十分明顯。

4.4 Flink 平台的演進

Flink 流批一體在 Shopee 的大規模實踐

最後,簡單介紹一下 Shopee Flink 平台支援批任務的發展過程。

我們内部支援 Flink 的批是從去年三季度開始的,到現在為止一年多。從改造平台支援 Batch,到并入離線生态,打通依賴和血緣,再到搭建 Remote Shuffle。有效的支撐起了 Shopee 各個業務線對 Flink 流批一體的需求。

整個落地過程中,最主要經驗的是要站在使用者的視角看待問題,合理地評估使用者的改動成本以及收益,幫助使用者找出業務遷移的潛在風險,降低使用者使用的門檻。

未來規劃主要還是在業務拓展方面。我們會加大 Flink 批任務的推廣,探索更多流批一體的業務場景。同時跟社群一起,在合适的場景下,加速使用者向 SQL 和流批一體的轉型。

繼續閱讀