天天看點

Flink Checkpoint 問題排查

作者:小蝦好望角

在 Flink 中,狀态可靠性保證由 Checkpoint 支援,當作業出現 failover 的情況下,Flink 會從最近成功的 Checkpoint 恢複。在生産實踐中,Checkpoint 的問題需要引起我們的重視。

Checkpoint 流程簡介

Checkpoint 是由 JM 的 Checkpoint Coordinator 發起的:

Flink Checkpoint 問題排查

1.1 Coordinator 向所有 Source 節點觸發 Checkpoint

Flink Checkpoint 問題排查

1.2 在資料流中插入 Checkpoint Barrier

source 節點向下遊廣播 barrier,這個 barrier 就是實作 Chandy-Lamport 分布式快照算法的核心,下遊的 task 隻有收到所有 input 的 barrier 才會執行相應的 Checkpoint:

Flink Checkpoint 問題排查

1.3 對算子 State 狀态進行同步快照與異步上傳

當 task 完成 state 備份後,會将備份資料的位址(state handle)通知給 Checkpoint Coordinator。這裡分為同步和異步(如果開啟的話)兩個階段:

Flink Checkpoint 問題排查

同步階段:task執行狀态快照,并寫入外部存儲系統(根據狀态後端的選擇不同有所差別)執行快照的過程:

  • 對 state 做深拷貝
  • 将寫操作封裝在異步的 FutureTask 中,FutureTask 的作用包括:1)打開輸入流;2)寫入狀态的中繼資料資訊;3)寫入狀态;4)關閉輸入流

異步階段:

  • 執行同步階段建立的 FutureTask
  • 向 Checkpoint Coordinator 發送 ACK 響應

1.4 多輸入算子要進行 Barrier 對齊操作

下遊的 sink 節點收集齊上遊兩個 input 的 barrier 之後,會執行本地快照。這裡特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 會全量刷資料到磁盤上(紅色大三角表示),然後 Flink 架構會從中選擇沒有上傳的檔案進行持久化備份(紫色小三角):

Flink Checkpoint 問題排查

同樣的,sink 節點在完成自己的 Checkpoint 之後,會将 state handle 傳回通知 Coordinator:

Flink Checkpoint 問題排查

1.5 所有算子狀态都已上傳後确認 Checkpoint 完成

最後,當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 檔案:

Flink Checkpoint 問題排查

Checkpoint 監控面闆

Checkpoint 監控面闆詳細介紹參考官網連結 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/monitoring/checkpoint_monitoring/,這個頁面上有解釋 End to End Duration、Processed (persisted) in-flight data、Sync Duration、Async Duration、Alignment Duration、Start Delay,如果對這些概念不清楚的同學一定要好好看看。

隻有了解了 Checkpoint 的流程,才能看懂 Flink Web UI 提供的 Checkpoint 資訊:

Flink Checkpoint 問題排查

日志分析示例

我們在 Checkpoint 的 History Tab 可以看到是否有 Checkpoint 失敗:

Flink Checkpoint 問題排查

從上圖中我們發現 Checkpoint 10423 失敗了,借助 Checkpoint 監控面闆,我們可以做初步分析和判斷,至于具體的原因可能還需要通過日志來分析:

1、首先我們可以在 jobmanager.log 中查找關鍵字“checkpoint 10423”,假設我們找到如下内容:

Flink Checkpoint 問題排查
  • 0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id
  • 85d268e6fbc19411185f7e4868a44178 是 job id

2、然後,我們可以在 jobmanager.log 中查找 execution id,找到被排程到哪個 taskmanager 上,類似如下所示:

Flink Checkpoint 問題排查

3、從上面的日志我們知道該 execution 被排程到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下來我們就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失敗的具體原因

常見的 Checkpoint 慢的情況

Checkpoint 慢可以算是一個高頻問題,比如 Checkpoint interval 1 分鐘,逾時 10 分鐘,Checkpoint 經常需要做 9 分鐘(我們希望 1 分鐘左右就能夠做完),而且我們預期 state size 不是非常大。

4.1 Checkpointed Data Size 太大

狀态資料的大小(在 Checkpoint 監控面闆上看)也會影響 Checkpoint 的時間,并且在 Checkpoint 時 IO 壓力也會較大。對于像 RocksDB 這種支援增量 Checkpoint 的 StateBackend,如果兩次 Checkpoint 之間狀态變化不大,那麼增量 Checkpoint 能夠極大減少狀态上傳時間。

但目前的增量 Checkpoint 仍存在一些問題:

  • 一是不通用,不是所有的 StateBackend 都能夠支援增量 Checkpoint
  • 二是存在由于狀态合并的影響,增量狀态資料仍會非常大

[FLIP-158: Generalized incremental checkpoints](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints) 提出了一種通用的增量快照方案,其核心思想是基于 state changelog,changelog 能夠細粒度地記錄狀态資料的變化,如下圖所示:

Flink Checkpoint 問題排查

4.2 作業存在反壓或者資料傾斜

我們知道 task 僅在接受到所有的 barrier 之後才會進行 snapshot,如果作業存在反壓,或者有資料傾斜,則會導緻全部的 channel 或者某些 channel 的 barrier 發送慢,進而整體影響 Checkpoint 的時間,我們可以通過如下的頁面進行檢查:

Flink Checkpoint 問題排查

上圖中我們選擇了一個 task,檢視所有 subtask 的反壓情況,發現都是 high,表示反壓情況嚴重,這種情況下會導緻下遊接收 barrier 比較晚。

分析反壓的大緻思路是:

  • 如果一個 Subtask 的發送端 Buffer 占用率很高,則表明它被下遊反壓限速了
  • 如果一個 Subtask 的接受端 Buffer 占用很高,則表明它将反壓傳導至上遊

反壓情況可以根據以下表格進行對号入座:

Flink Checkpoint 問題排查

對于 Flink 1.9 及以上版本,除了上述的表格,我們還可以根據 floatingBuffersUsage/exclusiveBuffersUsage(其中inPoolUsage 等于 floatingBuffersUsage 與 exclusiveBuffersUsage 的總和)以及其上遊 Task 的 outPoolUsage 來進行進一步的分析一個 Subtask 和其上遊 Subtask 的資料傳輸:

Flink Checkpoint 問題排查

通常來說,floatingBuffersUsage 為高則表明反壓正在傳導至上遊,而 exclusiveBuffersUsage 則表明了反壓是否存在傾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低為有傾斜,因為少數 channel 占用了大部分的 Floating Buffer)。參考連結 [How to identify the source of backpressure](https://flink.apache.org/2021/07/07/backpressure.html) 和 [Monitoring, Metrics, and that Backpressure Thing](https://flink.apache.org/2019/07/23/flink-network-stack-2.html)

再來看看資料傾斜:

Flink Checkpoint 問題排查

上圖中我們選擇其中一個 operator,點選所有的 subtask,然後按照 Records Received/Bytes Received/TPS 從大到小進行排序,能看到前面幾個 subtask 會比其他的 subtask 要處理的資料多。

如果存在反壓或者資料傾斜的情況,我們需要首先解決反壓或者資料傾斜問題之後,再檢視 Checkpoint 的時間是否符合預期。

4.3 Barrier 對齊慢

從前面我們知道 Checkpoint 在 task 端分為 barrier 對齊(收齊所有上遊發送過來的 barrier),然後開始同步階段,再做異步階段。如果 barrier 一直對不齊的話,就不會開始做 snapshot。

Flink Checkpoint 問題排查

注意:這裡我們要了解和區分 Start Delay 和 Alignment Duration。

針對 Barrier 對齊慢的解決思路有兩個:

Flink Checkpoint 問題排查

1、讓 barrier 能跳過 buffer 中緩存的資料,對應 [FLIP-76: Unaligned Checkpoints](https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints),Unaligned Checkpoints 最根本的思想就是将緩沖的資料當做算子狀态的一部分,該機制仍會使用 barrier,用來觸發 checkpoint,其原理如下圖所示:

Flink Checkpoint 問題排查

2、讓 buffer 中的資料變少,對應 [FLIP-183: Dynamic buffer size adjustment (Buffer debloat) ](https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment),其思想是動态調整 Buffer 大小,隻緩存配置時間内可以處理的資料量,進而可以預估和控制 barrier 對齊所需的時間

Flink Checkpoint 問題排查

4.4 主線程太忙導緻沒機會做 snapshot

在 task 端,所有的處理都是單線程的,資料處理和 barrier 處理都由主線程處理,如果主線程在處理太慢(比如使用 RocksDBBackend,state 操作慢導緻整體處理慢),導緻 barrier 處理的慢,也會影響整體 Checkpoint 的進度,在這一步我們需要能夠檢視某個 PID 對應熱點方法,這裡推薦兩個方法:

1、多次連續 jstack,檢視一直處于 RUNNABLE 狀态的線程有哪些

2、使用工具 AsyncProfile dump 一份火焰圖,檢視占用 CPU 最多的棧。比方說,對 TaskManager 進行 CPU profile,從中我們可以分析到 Task Thread 是否跑滿一個 CPU 核:

  • 如果是的話要分析 CPU 主要花費在哪些函數裡面
  • 如果不是的話要看 Task Thread 阻塞在哪裡,可能是使用者函數本身有些同步的調用,也可能是記憶體資源不足或 GC(包括 TaskManager JVM 各區記憶體不合理導緻的頻繁 Full GC 甚至失聯)等原因導緻的暫時的暫停
  • 推薦給 TaskManager 啟用 G1 垃圾回收器來優化 GC,并加上 -XX:+PrintGCDetails 來列印 GC 日志的方式來觀察 GC 的問題

4.5 同步階段做的慢

同步階段一般不會太慢,但是如果我們通過日志發現同步階段比較慢,那麼可以分兩類情況來分析:

  • 對于非 RocksDBBackend 我們可以考慮檢視是否開啟了異步 snapshot,如果開啟了異步 snapshot 還是慢,需要看整個 JVM 在幹嘛,也可以使用上面提到的工具
  • 對于 RocksDBBackend 來說,我們可以用 iostate 檢視磁盤的壓力如何,另外可以檢視 tm 端 RocksDB 的 log 的日志如何,檢視其中 snapshot 的時間總共開銷多少

4.6 異步階段做的慢

對于異步階段來說,tm 端主要将 state 備份到持久化存儲上:

  • 對于非 RocksDBBackend 來說,主要瓶頸來自于網絡,這個階段可以考慮觀察網絡的 metric,或者對應機器上能夠觀察到網絡流量的情況(比如 iftop)
  • 對于 RocksDB 來說,則需要從本地讀取檔案,寫入到遠端的持久化存儲上,是以不僅需要考慮網絡的瓶頸,還需要考慮本地磁盤的性能;另外對于 RocksDBBackend 來說,如果覺得網絡流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考慮開啟多線程上傳功能

常見的檢查點和狀态問題

1、Received checkpoint barrier for checkpoint <cp_id> before completing current checkpoint <cp_id>. Skipping current checkpoint.

在目前檢查點還未做完時,收到了更新的檢查點的 barrier,表示目前檢查點不再需要而被取消掉,一般不需要特殊處理。

2、Checkpoint <cp_id> expired before completing

首先應檢查 CheckpointConfig.setCheckpointTimeout() 方法設定的檢查點逾時,如果設的太短,适當改長一點。另外就是考慮發生了反壓或資料傾斜,或者 barrier 對齊太慢。

3、org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible

Flink 的狀态是按 key 組織并儲存的,如果程式邏輯内改了 keyBy() 邏輯或者 key 的序列化邏輯,就會導緻檢查點/儲存點的資料無法正确恢複。是以如果必須要改key相關的東西,就棄用之前的狀态資料吧。

4、org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported

在 1.9 之前的 Flink 版本中,如果我們使用 RocksDB 狀态後端,并且更改了自用 MapState的schema,恢複作業時會抛出此異常,表示不支援更改 schema。這個問題已經在 FLINK-11947 解決,更新版本即可。

5、時鐘不同步導緻無法啟動

啟動Flink任務的時候報錯 Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。

然後仔細看發現:system times on machines may be out of sync。

意思說是機器上的系統時間可能不同步。同步叢集機器時間即可。

繼續閱讀