作者:李康
摘要:本文主要分享 Flink 的 CheckPoint 機制、反壓機制及 Flink 的記憶體模型。對這3部分内容的熟悉是調優的前提,文章主要從以下幾個部分分享:
- 原理剖析
- 性能定位
- 經典場景調優
- 記憶體調優
Checkpoint 機制
1.什麼是 checkpoint
簡單地說就是 Flink 為了達到容錯和 exactly-once 語義的功能,定期把 state 持久化下來,而這一持久化的過程就叫做 checkpoint ,它是 Flink Job 在某一時刻全局狀态的快照。
當我們要對分布式系統實作一個全局狀态保留的功能時,傳統方案會引入一個統一時鐘,通過分布式系統中的 master 節點廣播出去給每一個 slaves 節點,當節點接收到這個統一時鐘時,它們就記錄下自己目前的狀态即可。

但是統一時鐘的方式也存在一定的問題,某一個 node 進行的 GC 時間比較長,或者 master 與 slaves 的網絡在當時存在波動而造成時鐘的發送延遲或者發送失敗,都會造成此 slave 和其它的機器出現資料不一緻而最終導緻腦裂的情況。如果我們想要解決這個問題,就需要對 master 和 slaves 做一個 HA(High Availability)。但是,一個系統越是複雜,就越不穩定且維護成本越高。
Flink 是将 checkpoint 都放進了一個名為 Barrier 的流。
上圖中就是一個 Barrier 的例子,從上遊的第一個 Task 到下遊的最後一個 Task,每次當 Task 經過圖中藍色的栅欄時,就會觸發 save snapshot(快照)的功能。我們用一個例子來簡單說明。
2.執行個體分析
這是一個簡單的 ETL 過程,首先我們把資料從 Kafka 中拿過來進行一個 trans 的轉換操作,然後再發送到一個下遊的 Kafka
此時這個例子中沒有進行 chaining 的調優。是以此時采用的是 forward strategy ,也就是 “一個 task 的輸出隻發送給一個 task 作為輸入”,這樣的方式,這樣做也有一個好處就是如果兩個 task 都在一個 JVM 中的話,那麼就可以避免不必要的網絡開銷
設定 Parallism 為 2,此時的 DAG 圖如下:
■ CK的分析過程
每一個 Flink 作業都會有一個 JobManager ,JobManager 裡面又會有一個 checkpoint coordinator 來管理整個 checkpoint 的過程,我們可以設定一個時間間隔讓 checkpoint coordinator 将一個 checkpoint 的事件發送給每一個 Container 中的 source task,也就是第一個任務(對應并行圖中的 task1,task2)。
當某個 Source 算子收到一個 Barrier 時,它會暫停自身的資料處理,然後将自己的目前 state 制作成 snapshot(快照),并儲存到指定的持久化存儲中,最後向 CheckpointCoordinator 異步發送一個 ack(Acknowledge character --- 确認字元),同時向自身所有下遊算子廣播該 Barrier 後恢複自身的資料處理。
每個算子按照上面不斷制作 snapshot 并向下遊廣播,直到最後 Barrier 傳遞到 sink 算子,此時快照便制作完成。這時候需要注意的是,上遊算子可能是多個資料源,對應多個 Barrier 需要全部到齊才一次性觸發 checkpoint ,是以在遇到 checkpoint 時間較長的情況時,有可能是因為資料對齊需要耗費的時間比較長所造成的。
■ Snapshot & Recover
如圖,這是我們的Container容器初始化的階段,e1 和 e2 是剛從 Kafka 消費過來的資料,與此同時,CheckpointCoordinator 也往它發送了 Barrier。
此時 Task1 完成了它的 checkpoint 過程,效果就是記錄下 offset 為2(e1,e2),然後把 Barrier 往下遊的算子廣播,Task3 的輸入為 Task1 的輸出,現在假設我的這個程式的功能是統計資料的條數,此時 Task3 的 checkpoint 效果就是就記錄資料數為2(因為從 Task1 過來的資料就是 e1 和 e2 兩條),之後再将 Barrier 往下廣播,當此 Barrier 傳遞到 sink 算子,snapshot 就算是制作完成了。
此時 source 中還會源源不斷的産生資料,并産生新的 checkpoint ,但是此時如果 Container 當機重新開機就需要進行資料的恢複了。剛剛完成的 checkpoint 中 offset為2,count為2,那我們就按照這個 state 進行恢複。此時 Task1 會從 e3 開始消費,這就是 Recover 操作。
■ checkpoint 的注意事項
下面列舉的3個注意要點都會影響到系統的吞吐,在實際開發過程中需要注意:
3.背壓的産生及 Flink 的反壓處理
在分布式系統中經常會出現多個 Task 多個 JVM 之間可能需要做資料的交換,我們使用生産者和消費者來說明這個事情。
假設我現在的 Producer 是使用了無界 buffer 來進行存儲,當我們的生産者生産速度遠大于消費者消費的速度時,生産端的資料會因為消費端的消費能力低下而導緻資料積壓,最終導緻 OOM 的産生。
而就算使用了有界 buffer,同樣消費者端的消費能力低下,當 buffer 被積滿時生産者就會停止生産,這樣還不能完全地解決我們的問題,是以就需要根據不同的情況進行調整。
Flink 也是通過有界 buffer 來進行不同 TaskManager 的資料交換。而且做法分為了靜态控流和動态控流兩種方式。
簡單來說就是當生産者比消費者的 TPS 多時,我們采用溢寫的方式,使用 batch 來封裝好我們的資料,然後分批發送出去,每次發送完成後再 sleep 一段時間,這個時間的計算方式是 left(剩餘的資料)/ tps,但是這個做法是很難去預估系統的情況的。
Flink 1.5 之前的流控是基于 TCP 的滑動視窗實作的,在之前的課程中已經有提到過了。而 Flink 在1.5之後已經棄用了該機制,是以這裡不展開說明。在此網絡模型中,資料生成節點隻能通過檢查目前的 channel 是否可寫來決定自己是否要向消費端發送資料,它對下遊資料消費端的真實容量情況一概不知。這就導緻,當生成節點發現 channel 已經不可寫的時候,有可能下遊消費節點已經積壓了很多資料。
Credit-Based 我們用下面的資料交換的例子說明:
Flink 的資料交換大緻分為三種,一種是同一個 Task 的資料交換,另一種是 不同 Task 同 JVM 下的資料交換。第三種就是不同 Task 且不同 JVM 之間的交換。
同一個 Task 的資料交換就是我們剛剛提到的 forward strategy 方式,主要就是避免了序列化和網絡的開銷。
第二種資料交換的方式就是資料會先通過一個 record Writer ,資料在裡面進行序列化之後再傳遞給 Result Partition ,之後資料會通過 local channel 傳遞給另外一個 Task 的 Input Gate 裡面,再進行反序列化,推送給 Record Reader 之後進行操作。
因為第三種資料交換涉及到了不同的 JVM,是以會有一定的網絡開銷,和第二種的差別就在于它先推給了 Netty ,通過netty把資料推送到遠端端的 Task 上。
■ Credit-Based
此時我們可以看到 event1 已經連帶一個 backlog = 1 推送給了 TaskB,backlog 的作用其實隻是為了讓消費端感覺到我們生産端的情況
此時 event1 被 TaskB 接收後,TaskB會傳回一個 ack 給 TaskA,同時傳回一個credit = 3,這個是告知 TaskA 它還能接收多少條資料,Flink 就是通過這種互相告知的方式,來讓生産者和消費者都能感覺到對方的狀态。
此時經過一段時間之後,TaskB中的有界 buffer 已經滿了,此時 TaskB回複 credit = 0 給 TaskA,此時 channel 通道将會停止工作,TaskA 不再将資料發往 TaskB。
此時再經過一段時間,TaskA 中的有界 Buffer 也已經出現了資料積壓,是以我們平時遇到的吞吐下降,處理延遲的問題,就是因為此時整個系統相當于一個停滞的狀态,如圖二示,所有的過程都被打上 “X”,表示這些過程都已經停止工作。
JVM 是一個非常複雜的系統,當其記憶體不足時會造成 OOM ,導緻系統的崩潰。Flink 在拿到我們配置設定的記憶體之後會先配置設定一個 cutoff 預留記憶體,保證系統的安全性。Netword buffers 其實就是對應我們剛剛一直提到的有界 buffer,momery manager 是一個記憶體池,這部分的記憶體可以設定為堆内或者堆外的記憶體,當然在流式作業中我們一般設定其為堆外記憶體,而 Free 部分就是提供給使用者使用的記憶體塊。
現在我們假設配置設定給此 TaskManager 的記憶體是 8g。
- 首先是要砍掉 cutoff 的部分,預設是0.25,是以我們的可用記憶體就是 8gx0.75
- network buffers 占用可用記憶體的 0.1 ,是以是 6144x0.1
- 堆内/堆外記憶體為可用記憶體減去 network buffers 的部分,再乘以 0.8
- 給到使用者使用的記憶體就是堆記憶體剩下的 0.2 那部分
其實真實情況是 Flink 是先知道了 heap 記憶體的大小然後逆推出其它記憶體的大小。
Flink 作業的問題定位
1.問題定位口訣
“一壓二查三名額,延遲吞吐是核心。
時刻關注資源量 , 排查首先看GC。”
一壓是指背壓,遇到問題先看背壓的情況,二查就是指 checkpoint ,對齊資料的時間是否很長,state 是否很大,這些都是和系統吞吐密切相關的,三名額就是指 Flink UI 那塊的一些展示,我們的主要關注點其實就是延遲和吞吐,系統資源,還有就是 GC logs。
- 看反壓:通常最後一個被壓高的 subTask 的下遊就是 job 的瓶頸之一。
- 看 Checkpoint 時長:Checkpoint 時長能在一定程度影響 job 的整體吞吐。
- 看核心名額:名額是對一個任務性能精準判斷的依據,延遲名額和吞吐則是其中最為關鍵的名額。
- 資源的使用率:提高資源的使用率是最終的目的。
■ 常見的性能問題
簡單解釋一下:
- 在關注背壓的時候大家往往忽略了資料的序列化和反序列化過程所造成的性能問題。
- 一些資料結構,比如 HashMap 和 HashSet 這種 key 需要經過 hash 計算的資料結構,在資料量大的時候使用 keyby 進行操作, 造成的性能影響是非常大的。
- 資料傾斜是我們的經典問題,後面再進行展開。
- 如果我們的下遊是 MySQL,HBase 這種,我們都會進行一個批處理的操作,就是讓資料存儲到一個 buffer 裡面,在達到某些條件的時候再進行發送,這樣做的目的就是減少和外部系統的互動,降低網絡開銷的成本。
- 頻繁 GC ,無論是 CMS 也好,G1 也好,在進行 GC 的時候,都會停止整個作業的運作,GC 時間較長還會導緻 JobManager 和 TaskManager 沒有辦法準時發送心跳,此時 JobManager 就會認為此 TaskManager 失聯,它就會另外開啟一個新的 TaskManager
- 視窗是一種可以把無限資料切割為有限資料塊的手段。比如我們知道,使用滑動視窗的時候資料的重疊問題,size = 5min 雖然不屬于大視窗的範疇,可是 step = 1s 代表1秒就要進行一次資料的處理,這樣就會造成資料的重疊很高,資料量很大的問題。
2.Flink 作業調優
我們可以通過一些資料結構,比如 Set 或者 Map 來結合 Flink state 進行去重。但是這些去重方案會随着資料量不斷增大,進而導緻性能的急劇下降,比如剛剛我們分析過的 hash 沖突帶來的寫入性能問題,記憶體過大導緻的 GC 問題,TaskManger 的失聯問題。
方案二和方案三也都是通過一些資料結構的手段去進行去重,有興趣的同學可以自行下去了解,在這裡不再展開。
■ 資料傾斜
資料傾斜是大家都會遇到的高頻問題,解決的方案也不少。
第一種場景是當我們的并發度設定的比分區數要低時,就會造成上面所說的消費不均勻的情況。
第二種提到的就是 key 分布不均勻的情況,可以通過添加随機字首打散它們的分布,使得資料不會集中在幾個 Task 中。
在每個節點本地對相同的 key 進行一次聚合操作,類似于 MapReduce 中的本地 combiner。map-side 預聚合之後,每個節點本地就隻會有一條相同的 key,因為多條相同的 key 都被聚合起來了。其他節點在拉取所有節點上的相同 key 時,就會大大減少需要拉取的資料數量,進而也就減少了磁盤 IO 以及網絡傳輸開銷。
■ 記憶體調優
Flink 的記憶體結構剛剛我們已經提及到了,是以我們清楚,調優的方面主要是針對 非堆記憶體 Network buffer ,manager pool 和堆記憶體的調優,這些基本都是通過參數來進行控制的。
這些參數我們都需要結合自身的情況去進行調整,這裡隻給出一些建議。而且對于 ManagerBuffer 來說,Flink 的流式作業現在并沒有過多使用到這部分的記憶體,是以我們都會設定得比較小,不超過0.3。
堆記憶體的調優是關于 JVM 方面的,主要就是将預設使用的垃圾回收器改為 G1 ,因為預設使用的 Parallel Scavenge 對于老年代的 GC 存在一個串行化的問題,它的 Full GC 耗時較長,下面是關于 G1 的一些介紹,網上資料也非常多,這裡就不展開說明了。
總 結
本文帶大家了解了 Flink 的 CheckPoint 機制,反壓機制及 Flink 的記憶體模型和基于記憶體模型分析了一些調優的政策。希望能對大家有所幫助,原文分享的視訊回顧可移步下方連結:
https://ververica.cn/developers/flink-training-course-operation/