作者:邱從賢(山智)
衆所周知 Flink 是目前廣泛使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會将狀态快照備份到分布式存儲系統,供後續恢複使用。在 Alibaba 内部我們使用的存儲主要是 HDFS,當同一個叢集的 Job 到達一定數量後,會對 HDFS 造成非常大的壓力,本文将介紹一種大幅度降低 HDFS 壓力的方法 -- 小檔案合并。
背景
不管使用 FsStateBackend、RocksDBStateBackend 還是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會将狀态快照寫到分布式檔案系統中,然後将檔案句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,如下圖所示。

對于全量 checkpoint 來說,TM 将每個 checkpoint 内部的資料都寫到同一個檔案,而對于 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]來說,則會将每個 sst 檔案寫到一個分布式系統的檔案内。當作業量很大,且作業的并發很大時,則會對底層 HDFS 形成非常大的壓力:1)大量的 RPC 請求會影響 RPC 的響應時間(如下圖所示);2)大量檔案對 NameNode 記憶體造成很大壓力。
在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小檔案多的問題[3],将小于一定門檻值的 state 直接發送到 JM,由 JM 統一寫到分布式檔案中,進而避免在 TM 端生成小檔案。但是這個方案有一定的局限性,門檻值設定太小,還會有很多小檔案生成,門檻值設定太大,則會導緻 JM 記憶體消耗太多有 OOM 的風險。
1 小檔案合并方案
針對上面的問題我們提出一種解決方案 -- 小檔案合并。
在原來的實作中,每個 sst 檔案會打開一個
CheckpointOutputStream,每個 CheckpointOutputStream 對應一個 FSDataOutputStream,将本地檔案寫往一個分布式檔案,然後關閉 FSDataOutputStream,生成一個 StateHandle。如下圖所示:
小檔案合并則會重用打開的 FSDataOutputStream,直至檔案大小達到預設的門檻值為止,換句話說多個 sst 檔案會重用同一個 DFS 上的檔案,每個 sst 檔案占用 DFS 檔案中的一部分,最終多個 StateHandle 共用一個實體檔案,如下圖所示。
在接下來的章節中我們會描述實作的細節,其中需要重點考慮的地方包括:
-
并發 checkpoint 的支援
Flink 天生支援并發 checkpoint,小檔案合并方案則會将多個檔案寫往同一個分布式存儲檔案中,如果考慮不當,資料會寫串或者損壞,是以我們需要有一種機制保證該方案的正确性,較長的描述參考 2.1 節
-
防止誤删檔案
我們使用引用計數來記錄檔案的使用情況,僅通過檔案引用計數是否降為 0 進行判斷删除,則可能誤删檔案,如何保證檔案不會被錯誤删除,我們将會在 2.2 節進行闡述
-
降低空間放大
使用小檔案合并之後,隻要檔案中還有一個 statehandle 被使用,整個分布式檔案就不能被删除,是以會占用更多的空間,我們在 2.3 節描述了解決該問題的詳細方案
-
異常處理
我們将在 2.4 節闡述如何處理異常情況,包括 JM 異常和 TM 異常的情況
- 2.5 節中會較長的描述在 Checkpoint 被取消或者失敗後,如何取消 TM 端的 Snapshot,如果不取消 TM 端的 Snapshot,則會導緻 TM 端實際運作的 Snapshot 比正常的多
在第 3 節中闡述了小檔案合并方案與現有方案的相容性;第 4 節則會描述小檔案合并方案的優勢和不足;最後在第 5 節我們展示在生産環境下取得的效果。
2 設計實作
本節中我們會較長的描述整個小檔案合并的細節,以及其中的設計要點。
這裡我們大緻回憶一下 TM 端 Snapshot 的過程
- TM 端 barrier 對齊
- TM Snapshot 同步操作
- TM Snapshot 異步操作
其中上傳 sst 檔案到分布式存儲系統在上面的第三步,同一個 checkpoint 内的檔案順序上傳,多個 checkpoint 的檔案上傳可能同時進行。
2.1 并發 checkpoint 支援
Flink 天生支援并發 checkpoint,是以小檔案合并方案也需要能夠支援并發 checkpoint,如果不同 checkpoint 的 sst 檔案同時寫往一個分布式檔案,則會導緻檔案内容損壞,後續無法從該檔案進行 restore。
在 FLINK-11937[4] 的提案中,我們會将每個 checkpoint 的 state 檔案寫到同一個 HDFS 檔案,不同 checkpoint 的 state 寫到不同的 HDFS 檔案 -- 換句話說,HDFS 檔案不跨 Checkpoint 共用,進而避免了多個用戶端同時寫入同一個檔案的情況。
後續我們會繼續推進跨 Checkpoint 共用檔案的方案,當然在跨 Checkpoint 共用檔案的方案中,并行的 Checkpoint 也會寫往不同的 HDFS 檔案。
2.2 防止誤删檔案
複用底層檔案之後,我們使用引用計數追蹤檔案的使用情況,在檔案引用數降為 0 的情況下删除檔案。但是在某些情況下,檔案引用數為 0 的時候,并不代表檔案不會被繼續使用,可能導緻檔案誤删。下面我們會詳>細描述開啟并發 checkpoint 後可能導緻檔案誤删的情況,以及解決方案。
我們以下圖為例,maxConcurrentlyCheckpoint = 2
上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基于 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在注冊
4.sst
的時候發現,發現
4.sst
在 chk-2 中已經注冊過,會重用 chk-2 中
4.sst
對應的 stateHandle,然後取消 chk-3 中的
4.sst
的注冊,并且删除 stateHandle,在處理完 chk-3 中
4.sst
之後,該 stateHandle 對應的分布式檔案的引用計數為 0,如果我們這個時候删除分布式檔案,則會同時删除
5.sst
對應的内容,導緻後續無法從 chk-3 恢複。
這裡的問題是如何在
stateHandle
對應的分布式檔案引用計數降為 0 的時候正确判斷是否還會繼續引用該檔案,是以在整個 checkpoint 完成處理之後再判斷某個分布式檔案能否删除,如果真個 checkpoint 完成發現檔案沒有被引用,則可以安全删除,否則不進行删除。
2.3 降低空間放大
使用小檔案合并方案後,每個 sst 檔案對應分布式檔案中的一個 segment,如下圖所示
檔案僅能在所有 segment 都不再使用時進行删除,上圖中有 4 個 segment,僅 segment-4 被使用,但是整個檔案都不能删除,其中 segment[1-3] 的空間被浪費掉了,從實際生産環境中的資料可知,整體的空間放大率(實際占用的空間 / 真實有用的空間)在 1.3 - 1.6 之間。
為了解決空間放大的問題,在 TM 端起異步線程對放大率超過門檻值的檔案進行壓縮。而且僅對已經關閉的檔案進行壓縮。
整個壓縮的流程如下所示:
- 計算每個檔案的放大率
- 如果放大率較小則直接跳到步驟 7
- 如果檔案 A 的放大率超過門檻值,則生成一個對應的新檔案 A‘(如果這個過程中建立檔案失敗,則由 TM 負責清理工作)
- 記錄 A 與 A’ 的映射關系
- 在下一次 checkpoint X 往 JM 發送落在檔案 A 中的 StateHandle 時,則使用 A` 中的資訊生成一個新的 StateHandle 發送給 JM
- checkpoint X 完成後,我們增加 A‘ 的引用計數,減少 A 的引用計數,在引用計數降為 0 後将檔案 A 删除(如果 JM 增加了 A’ 的引用,然後出現異常,則會從上次成功的 checkpoint 重新建構整個引用計數器)
- 檔案壓縮完成
2.4 異常情況處理
在 checkpoint 的過程中,主要有兩種異常:JM 異常和 TM 異常,我們将分情況闡述。
2.4.1 JM 異常
JM 端主要記錄 StateHandle 以及檔案的引用計數,引用計數相關資料不需要持久化到外存中,是以不需要特殊的處理,也不需要考慮 transaction 等相關操作,如果 JM 發送 failover,則可以直接從最近一次 complete checkpoint 恢複,并重建引用計數即可。
2.4.2 TM 異常
TM 異常可以分為兩種:1)該檔案在之前 checkpoint 中已經彙報過給 JM;2)檔案尚未彙報過給 JM,我們會分情況闡述。
-
檔案已經彙報過給 JM
檔案彙報過給 JM,是以在 JM 端有檔案的引用計數,檔案的删除由 JM 控制,當檔案的引用計數變為 0 之後,JM 将删除該檔案。
-
檔案尚未彙報給 JM
該檔案暫時尚未彙報過給 JM,該檔案不再被使用,也不會被 JM 感覺,成為孤兒檔案。這種情況暫時有外圍工具統一進行清理。
2.5 取消 TM 端 snapshot
像前面章節所說,我們需要在 checkpoint 逾時/失敗時,取消 TM 端的 snapshot,而 Flink 則沒有相應的通知機制,現在 FLINK-8871[5] 在追蹤相應的優化,我們在内部增加了相關實作,當 checkpoint 失敗時會發送 RPC 資料給 TM,TM 端接受到相應的 RPC 消息後,會取消相應的 snapshot。
3 相容性
小檔案合并功能支援從之前的版本無縫遷移過來。從之前的 checkpoint restore 的的步驟如下:
- 每個 TM 分到自己需要 restore 的 state handle
- TM 從遠端下載下傳 state handle 對應的資料
- 從本地進行恢複
小檔案合并主要影響的是第 2 步,從遠端下載下傳對應資料的時候對不同的 StateHandle 進行适配,是以不影響整體的相容性。
4 優勢和不足
- 優勢:大幅度降低 HDFS 的壓力:包括 RPC 壓力以及 NameNode 記憶體的壓力
- 不足:不支援 State 多線程上傳的功能(State 上傳暫時不是 checkpoint 的瓶頸)
5 線上環境的結果
在該方案上線後,對 Namenode 的壓力大幅降低,下面的截圖來自線上生産叢集,從資料來看,檔案建立和關閉的 RPC 有明顯下降,RPC 的響應時間也有大幅度降低,確定順利度過雙十一。
參考文獻
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html[2]
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html[3]
https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale[4]
https://issues.apache.org/jira/browse/FLINK-11937[5]
https://issues.apache.org/jira/browse/FLINK-8871