天天看點

Flink 引擎在快手的深度優化與生産實踐

摘要:本文整理自快手實時計算團隊技術專家劉建剛在 Flink Forward Asia 2021 生産實踐專場的演講。主要内容包括:
  1. 快手 Flink 的曆史及現狀
  2. Flink 容錯能力提升
  3. Flink 引擎控制與實踐
  4. 快手批處理實踐
  5. 未來規劃

一、快手 Flink 的曆史與現狀

Flink 引擎在快手的深度優化與生産實踐

快手從 2018 年開始對 Flink 進行深度整合,經過 4 年發展,實時計算平台逐漸完善并賦能周邊各種元件。

  • 2018 年我們針對 Flink 1.4 進行了平台化建設并大幅提升運維管理能力,達到了生産可用。
  • 2019 年我們開始基于 1.6 版本進行疊代開發,很多業務都開始實時化,比如優化 interval join 為商業化等平台帶來顯著收益、開發實時多元分析加速超大多元報表的實時化,這一年我們的 Flink SQL 平台也投入使用。
  • 到了 2020 年,我們更新到 1.10,對 sql 的功能進行了非常多的完善,同時進一步優化 Flink 的核心引擎,保障了 Flink 的易用性、穩定性、可維護性。
  • 2021 年我們開始發力離線計算,支援湖倉一體的建設,進一步完善 Flink 生态。
Flink 引擎在快手的深度優化與生産實踐

上圖是快手基于 Flink 的技術棧。

  • 最核心、最底層是 Flink 的計算引擎,包括流計算和批處理,我們針對穩定性和性能做了大量工作。
  • 外面一層是跟 Flink 打交道的周邊元件,既有 Kafka、rocketMQ 等中間件,也有 ClickHouse、Hive 等資料分析工具,還有 Hudi 等資料湖的使用。使用者可以基于 Flink 和這些元件建構各種應用,涵蓋了實時、近實時、批處理的各種場景。
  • 最外層是具體的使用場景,常見的有電商、商業化等視訊相關的業務方,應用場景包含機器學習、多元分析等。另外還有很多技術部門基于 Flink 來實作資料的導入、轉換,比如 CDC、湖倉一體等。
Flink 引擎在快手的深度優化與生産實踐

應用規模上,我們有 50 萬 CPU 核,主要通過 Yarn 和 K8s 的方式進行資源托管,上面運作着 2000+ 作業,峰值處理達到了 6億/秒,日處理條數達到了 31.7 萬億,節假日或活動的時候流量甚至會翻倍。

二、容錯能力提升

Flink 引擎在快手的深度優化與生産實踐

容錯能力主要包含以下部分:

  • 首先是單點恢複,支援任意多個 task 失敗時的原地重新開機,long-running 作業基本可以做到永不斷流;
  • 其次,是叢集故障的應對,包含冷備、熱備以及 Kafka 雙叢集的內建;最後是黑名單的使用。
Flink 引擎在快手的深度優化與生産實踐

Flink 為了做到 exactly-once,任何節點出現故障都需要重新開機整個作業,全局重新開機會帶來長時間的停頓,最高可達十幾分鐘。有些場景不追求 exactly-once,比如推薦等實時場景,但它們對服務可用性的要求很高,無法容忍作業的斷流,還有模型訓練等初始化很慢的場景,重新開機時間特别長,一旦重新開機将會造成很大的影響。基于以上考慮,我們開發了單點恢複功能。

Flink 引擎在快手的深度優化與生産實踐

上圖是單點恢複的基本原理。如圖有三個 task,其中中間的 task 失敗了,那麼首先 Flink 的主節點會重新排程中間的 task,此時上下遊的 task 不會失敗,而是等待重連。等中間的 task 排程成功後,master 節點會通知下遊的 task 去重連上遊的 task,與此同時中間的 task 也會去連它上遊的 task,通過重新建構讀視圖來恢複資料的讀取。等上下遊都連接配接成功後這個作業就可以正常工作了。

Flink 引擎在快手的深度優化與生産實踐

了解完基本原理,再來看一下線上多 task 恢複的案例。實際環境中經常會出現多個 task 同時失敗的情況,這個時候我們會按照拓撲順序來逐個恢複失敗的 task,比如上圖中是按照從左往右的順序恢複。

這個功能上線之後,我們内部有将近 100 個作業使用了這個功能,正常故障下作業都可以做到不斷流,即便出現小的流量波動,業務也可以做到無感覺,業務方徹底告别了服務斷流的噩夢。

Flink 引擎在快手的深度優化與生産實踐

叢集故障一旦發生就是緻命性的,所有的資料都會流失,服務也會挂掉。我們的方案主要包含冷備、熱備,以及 Flink 和 Kafka 的雙叢集內建。

Flink 引擎在快手的深度優化與生産實踐

冷備主要指的是對資料做備份,叢集挂掉以後可以快速在另外一個叢集啟動計算任務。

如上圖,KwaiJobManager 是快手的作業管理服務,其中的 failover coordinator 主要負責故障處理。我們會把所有 jar 包等檔案儲存在 HDFS,所有的資訊儲存在 Mysql,這兩者都做到了高可用。作業運作在主叢集 ClusterA,線上用的是增量快照,會存在檔案依賴的問題,是以我們定期做 savepoint 并拷貝到備叢集。為了避免檔案過多,我們設定了定時删除曆史快照。

一旦服務檢測到叢集 A 故障,就會立刻在叢集B啟動作業,并從最近一次的快照恢複,確定了狀态不丢失。對于使用者來說,隻需要設定一下主備叢集,剩下的全都交由平台方來做,使用者全程對故障無感覺。

Flink 引擎在快手的深度優化與生産實踐

熱備就是雙叢集同時運作一樣的任務。我們的熱備都是全鍊路的,Kafka 或者 ClickHouse 等都是雙跑。最上面的展示層隻會使用其中一份結果資料做展示,一旦出現故障,展示層會立刻切換到另外一份資料,切換過程在一秒以内,使用者全程無感覺。

相比冷備,熱備需要等量的資源來備份運作,但切換的速度更快,比較适用于春晚等要求極高的場景。

Flink 引擎在快手的深度優化與生産實踐

Flink 與 Kafka 的雙叢集內建,主要是因為快手的 Kafka 都具備雙叢集的能力,是以需要 Flink 支援讀寫雙叢集的 Kafka topic,這樣某個 Kafka 叢集挂掉時Flink可以線上無縫切換。如上圖所示,我們 Flink 對 Kafka 雙叢集做了抽象,一個邏輯上的 topic 底層對應兩個實體上的 topic,裡面由多個 partition 組合而成,Flink 消費邏輯 topic 就相當于同時讀取底層兩個實體 topic 的資料。

針對叢集的各種變動,我們全部抽象成了 partition 上的擴縮容,比如叢集挂掉,可以看成是邏輯 topic 的 partition 縮容;單叢集切雙叢集,可以看成是邏輯 topic 的擴容;topic 的遷移,可以看成邏輯 topic 先擴容再縮容。這裡我們都是按照雙叢集來舉例,實際上無論是雙叢集還是更多的叢集,原理都是一樣的,我們都提供了支援。

Flink 引擎在快手的深度優化與生産實踐

出現以下兩種情況的時候需要使用黑名單功能。第一種是反複排程有故障的機器,導緻作業頻繁失敗。另一種是機器因為硬體或網絡等原因,導緻 Flink 個别節點卡主但未失敗。

針對第一種情況,我們開發了門檻值拉黑,如果作業在同一個機器上失敗或者多次部署門檻值失敗,超過配置的門檻值就會拉黑;針對第二種情況,我們建立了異常分類機制,針對網絡卡頓和磁盤卡頓情況,直接驅除容器并且拉黑機器。另外我們還對外暴露了拉黑接口,打通了運維 Yarn 等外部系統,實作了實時拉黑。我們還以 Flink 黑名單為契機,建立了一套完整的硬體異常處理流程,實作了作業自動遷移,全程自動化運維,使用者無感覺。

三、Flink 引擎控制與實踐

3.1 Flink實時控制

Flink 引擎在快手的深度優化與生産實踐

針對 long-running 的實時作業,使用者經常需要作出變更比如調整參數來更改行為,還有一些系統運維比如作業降級、修改日志級别等,這些變更都需要重新開機作業來生效,有時會高達幾分鐘到幾十分鐘,在一些重要的場合,這是無法容忍的。比如在活動期間或者排查問題的關鍵點,作業一旦停止将會功虧一篑,是以我們需要在不停止作業的情況下實時調整作業的行為,也就是實時控制。

Flink 引擎在快手的深度優化與生産實踐

從更廣泛的角度來看,Flink 不僅是計算任務,也是一個 long-running service。我們的實時控制正是基于這樣的考慮,來為實時計算提供互動式的控制模式。如上圖所示,使用者通過經典的 kv 資料類型與 Flink dispatcher 互動,Flink 收到消息後,會先将它們持久化到 zk 用于 failover,然後根據具體的消息做相應的控制,比如控制 resource manager、控制 job master 或者其他元件。

Flink 引擎在快手的深度優化與生産實踐

我們既支援使用者自定義動态參數,也為使用者提供了很多現成的系統控制。使用者自定義主要是使用 RichFunction 來擷取動态參數,并且實作相應的邏輯,這樣在作業運作的時候就可以實時傳入參數,達到實時控制的效果。

系統提供的實時控制能力,主要包含資料源限速、采樣、重置 Kafka offset、調整快照參數以及運維相關的更改日志級别、拉黑節點等功能。除此之外,我們還支援動态修改部分 Flink 原生配置。

快手内部對實時控制功能實作了産品化,使用者使用起來非常友善。

3.2 源端控制能力

Flink 引擎在快手的深度優化與生産實踐

Flink 處理曆史任務或者作業性能跟不上的的時候,會引發以下的問題:

首先 source 的各個并發處理速度不一緻,會進一步加重資料的亂序、丢失、對齊慢等問題。其次,快照會持續變大,嚴重影響作業性能。另外還有流量資源的不可控,在高負載的情況下會引發 CPU 打滿、oom 等穩定性問題。

由于 Flink 是一種 pipeline 實時計算,是以從資料源入手可以從根本上解決問題。

Flink 引擎在快手的深度優化與生産實踐

首先來看下曆史資料精準回放功能。上圖是以二倍速率去消費 Kafka 的曆史資料,Flink 作業追上 lag 之後就可以轉成實時消費。通過這種方式可以有效解決複雜任務的穩定性問題。

上圖的公式是一個基本原理,消費倍率 = Kafka 的時間差 / Flink 的系統時間差,使用者使用的時候隻需要配置倍率即可。

Flink 引擎在快手的深度優化與生産實踐

另外一個能力是 QPS 限速。資料流量很大的時候,會導緻 Flink 的負載很高以及作業不穩定。我們基于令牌桶算法,實作了一套分布式的限速政策,可以有效減緩 Flink 的壓力。使用 QPS 限速後,作業變得非常健康,上圖綠色部分可見。19 年的春晚大屏,我們就是通過這個技術實作了柔性可用的保障。

另外我們還支援自動适配 partition 的變更和實時控制,使用者可以随時随地調整作業的 QPS。

Flink 引擎在快手的深度優化與生産實踐

最後一個功能是資料源對齊,主要指 watermark 的對齊。首先每個 subtask 都會定期向主節點彙報自己的 watermark 進度,主要包括 watermark 的大小和速度。主節點會計算下一個周期的 target,即預期的最大 watermark,再加一個 diff 傳回給各個節點。各個 source task 會保證下一個周期的 watermark 不超過設定的 target。上圖最下面是 target 的計算公式,預測每個 task 下個周期結束時候的 waterMark 值,再加上我們允許的 maxdiff 然後取最大值,通過這種方式可以保障各個 source 的進度一緻,避免 diff 過大導緻的穩定性問題。

3.3 作業均衡排程

Flink 引擎在快手的深度優化與生産實踐

生産環境中經常會出現資源不均衡的現象,比如第一點 Flink 的 task 分布不均勻,導緻 task manager 資源使用不均衡,而作業的性能又往往受限于最繁忙的節點。針對這個問題,我們開發了作業均衡排程的政策;第二點是 CPU 使用不均衡,有些機器被打滿而有些機器很閑。針對這個問題,我們開發了 CPU 均衡排程的功能。

Flink 引擎在快手的深度優化與生産實踐

上圖中有三個 jobVertex,通過 hash shuffle 的方式來連接配接。上圖中間部分顯示了 Flink 的排程,每個 jobVertex 都是自上而下往 slot 裡排程 task,結果就是前兩個 slot 很滿而其他 slot 很空閑,第一個 task manager 很滿而第二個 task manager 很空閑。這是一個很典型的資源傾斜的場景,我們對此進行了優化。排程的時候首先計算需要的總資源,也就是需要多少個 task manager,然後計算每個 TM 配置設定的 slot 個數,確定 TM 中的 slot 資源均衡。最後均衡配置設定 task 到各個 slot 中,確定 slot 中 task 均衡。

Flink 引擎在快手的深度優化與生産實踐

實際運作過程中還存在另外一種傾斜情況 —— CPU 傾斜,我們來看下怎麼解決這個問題。上圖左側,使用者申請了一個核但實際隻使用了 0.5 個核,也有申請了一個核實際使用了一個核。按照預設排程政策,大量此類 case 可能會導緻有的機器 CPU 使用率很高,有的卻很閑,負載高的機器不論是性能還是穩定性都會比較差。那麼如何讓申請和使用的 diff 盡可能小?

我們的方案是對作業資源精準畫像,具體做法分為以下步驟:作業運作過程中統計每個 task 所在容器的 CPU 使用率,然後建立 task 到 executionSlotSharingGroup,再到 container 的映射,這樣就知道了每個 task 所在 slot 的 CPU 使用情況,然後根據映射關系重新開機作業,根據 task 所在 slot 的曆史 CPU 使用率來申請相應的資源,一般來說會預留一些 buffer。如上圖右圖所示,如果預測足夠準,重新開機後 task manager 使用的資源不變,但是申請值變小了,二者的 diff 就變小了。

其實業界一些先進的系統,比如 borg 是支援動态修改申請值的,但我們的底層排程資源不持這種政策,是以隻能在 Flink 這一層使用資源畫像來解決這個問題。當然資源畫像不能保證百分百準确,我們還有其他政策,比如限制高 CPU 負載的機器繼續配置設定資源,盡可能減少不均衡。另外我們還建立了分級保障制度,不同優先級的作業有不同的 cgroup 限制,比如低優先級作業不再超配,高優先級作業允許少量超配,進而避免 CPU 使用過多導緻的不均衡。

四、快手批處理實踐

Flink 引擎在快手的深度優化與生産實踐

上圖是我們的批處理架構圖。最底層為離線叢集,中間是 Flink 引擎以及 Flink 的 data stream API、SQL API,再上面是一些平台方比如 sql 入口、定時排程平台等,此外還有一些流批一體的探索,最上面是各種使用者比如視訊、商業化等。

流批一體中,流的特性是低延時,批的特性是高吞吐。針對流批一體,我們期待系統既能處理 unfield batch 資料,也可以調整資料塊的 shuffle 大小等來均衡作業的吞吐和時延。

Flink 引擎在快手的深度優化與生産實踐

快手内部對流批一體進行了很多探索,我們為存儲資料建立了統一的 Schema 标準,包括流表和批表,使用者可以使用相同的代碼來處理流表和批表,隻是配置不同。産生的結果也需要符合統一的 Schema 标準,這樣就可以打通上下遊,實作盡可能多的邏輯複用。Schema 統一是我們快手資料治理的一部分,湖倉一體等場景也都有這個需求。

應用場景主要包括以下幾個方面:

  • 名額計算,比如實時名額和報表計算。
  • 資料回溯,利用已有的離線資料重新生成其他名額。
  • 數倉加速,主要是資料倉庫和資料湖的實時加速。

流批一體帶來的收益是多方面的,首先是降低了開發和運維成本,實作了盡可能多的代碼邏輯複用,運維不再需要維護多個系統。其次是實時處理和批處理的口徑保持一緻,保障了最終結果的一緻。最後是資源方面的收益,有些場景隻需要一套實時系統。

Flink 引擎在快手的深度優化與生産實踐

我們在排程方面進行了優化。如上圖所示的三個 task,起初 a 和 c 已經完成,b 還在運作。這時 a 失敗了,按照預設的政策 ABC 都需要重新運作,即便 c 已經完成。在實際場景中會有大量的 c 進行重算,帶來巨大的資源損耗。針對這種情況如果,我們預設開啟了以下政策:如果 a 的結果是決定性的(實際上大部分批處理的輸出都是決定性的),可以不再重算 c,隻需計算 a 和 b。

Flink 引擎在快手的深度優化與生産實踐

上圖是我們快手内部針對批處理的優化和改進。

第一個是 shuffle service,現在既有内部的內建,也在試用社群的版本,主要是為了實作存儲和計算的解耦,同時提高 shuffle 的性能。第二個是動态資源的排程,主要是根據資料量來自動決定算子的并發,避免人工反複調整。第三個是慢節點規避,也叫推測執行,主要是為了減少長尾效應,減少總執行時間。第四個是 hive 的優化,比如 UDF 适配、文法相容。另外針對 partition 生成 split,我們增加了緩存、多線程生成等方式,極大減少了分片的時間。最後是一些壓縮方式的支援,比如支援 gzip、zstd 等。

五、未來規劃

  • 首先是實時計算,進一步增強 Flink 的性能、穩定性和應用性,并通過實時計算來加速各種業務場景。
  • 第二個是線上和離線的統一,包含實時、近實時和批處理。我們期待能用 Flink 統一快手的資料同步、轉換和在離線計算,讓ETL、數倉、資料湖處理等各類場景,都使用一套 Flink 計算系統。
  • 最後一個是彈性可伸縮,主要是雲原生相關,包含在離線混部和作業的彈性伸縮等。