天天看點

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

作者:邱從賢(山智)

衆所周知 Flink 是目前廣泛使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會将狀态快照備份到分布式存儲系統,供後續恢複使用。在 Alibaba 内部我們使用的存儲主要是 HDFS,當同一個叢集的 Job 到達一定數量後,會對 HDFS 造成非常大的壓力,本文将介紹一種大幅度降低 HDFS 壓力的方法 -- 小檔案合并。

背景

不管使用 FsStateBackend、RocksDBStateBackend 還是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會将狀态快照寫到分布式檔案系統中,然後将檔案句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,如下圖所示。

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

對于全量 checkpoint 來說,TM 将每個 checkpoint 内部的資料都寫到同一個檔案,而對于 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]來說,則會将每個 sst 檔案寫到一個分布式系統的檔案内。當作業量很大,且作業的并發很大時,則會對底層 HDFS 形成非常大的壓力:1)大量的 RPC 請求會影響 RPC 的響應時間(如下圖所示);2)大量檔案對 NameNode 記憶體造成很大壓力。

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻
阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小檔案多的問題[3],将小于一定門檻值的 state 直接發送到 JM,由 JM 統一寫到分布式檔案中,進而避免在 TM 端生成小檔案。但是這個方案有一定的局限性,門檻值設定太小,還會有很多小檔案生成,門檻值設定太大,則會導緻 JM 記憶體消耗太多有 OOM 的風險。

1 小檔案合并方案

針對上面的問題我們提出一種解決方案 -- 小檔案合并。

在原來的實作中,每個 sst 檔案會打開一個

CheckpointOutputStream,每個 CheckpointOutputStream 對應一個 FSDataOutputStream,将本地檔案寫往一個分布式檔案,然後關閉 FSDataOutputStream,生成一個 StateHandle。如下圖所示:

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

小檔案合并則會重用打開的 FSDataOutputStream,直至檔案大小達到預設的門檻值為止,換句話說多個 sst 檔案會重用同一個 DFS 上的檔案,每個 sst 檔案占用 DFS 檔案中的一部分,最終多個 StateHandle 共用一個實體檔案,如下圖所示。

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

在接下來的章節中我們會描述實作的細節,其中需要重點考慮的地方包括:

  1. 并發 checkpoint 的支援

    Flink 天生支援并發 checkpoint,小檔案合并方案則會将多個檔案寫往同一個分布式存儲檔案中,如果考慮不當,資料會寫串或者損壞,是以我們需要有一種機制保證該方案的正确性,較長的描述參考 2.1 節

  2. 防止誤删檔案

    我們使用引用計數來記錄檔案的使用情況,僅通過檔案引用計數是否降為 0 進行判斷删除,則可能誤删檔案,如何保證檔案不會被錯誤删除,我們将會在 2.2 節進行闡述

  3. 降低空間放大

    使用小檔案合并之後,隻要檔案中還有一個 statehandle 被使用,整個分布式檔案就不能被删除,是以會占用更多的空間,我們在 2.3 節描述了解決該問題的詳細方案

  4. 異常處理

    我們将在 2.4 節闡述如何處理異常情況,包括 JM 異常和 TM 異常的情況

  5. 2.5 節中會較長的描述在 Checkpoint 被取消或者失敗後,如何取消 TM 端的 Snapshot,如果不取消 TM 端的 Snapshot,則會導緻 TM 端實際運作的 Snapshot 比正常的多

在第 3 節中闡述了小檔案合并方案與現有方案的相容性;第 4 節則會描述小檔案合并方案的優勢和不足;最後在第 5 節我們展示在生産環境下取得的效果。

2 設計實作

本節中我們會較長的描述整個小檔案合并的細節,以及其中的設計要點。

這裡我們大緻回憶一下 TM 端 Snapshot 的過程

  1. TM 端 barrier 對齊
  2. TM Snapshot 同步操作
  3. TM Snapshot 異步操作

其中上傳 sst 檔案到分布式存儲系統在上面的第三步,同一個 checkpoint 内的檔案順序上傳,多個 checkpoint 的檔案上傳可能同時進行。

2.1 并發 checkpoint 支援

Flink 天生支援并發 checkpoint,是以小檔案合并方案也需要能夠支援并發 checkpoint,如果不同 checkpoint 的 sst 檔案同時寫往一個分布式檔案,則會導緻檔案内容損壞,後續無法從該檔案進行 restore。

在 FLINK-11937[4] 的提案中,我們會将每個 checkpoint 的 state 檔案寫到同一個 HDFS 檔案,不同 checkpoint 的 state 寫到不同的 HDFS 檔案 -- 換句話說,HDFS 檔案不跨 Checkpoint 共用,進而避免了多個用戶端同時寫入同一個檔案的情況。

後續我們會繼續推進跨 Checkpoint 共用檔案的方案,當然在跨 Checkpoint 共用檔案的方案中,并行的 Checkpoint 也會寫往不同的 HDFS 檔案。

2.2 防止誤删檔案

複用底層檔案之後,我們使用引用計數追蹤檔案的使用情況,在檔案引用數降為 0 的情況下删除檔案。但是在某些情況下,檔案引用數為 0 的時候,并不代表檔案不會被繼續使用,可能導緻檔案誤删。下面我們會詳>細描述開啟并發 checkpoint 後可能導緻檔案誤删的情況,以及解決方案。

我們以下圖為例,maxConcurrentlyCheckpoint = 2

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基于 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在注冊

4.sst

的時候發現,發現

4.sst

在 chk-2 中已經注冊過,會重用 chk-2 中

4.sst

對應的 stateHandle,然後取消 chk-3 中的

4.sst

的注冊,并且删除 stateHandle,在處理完 chk-3 中

4.sst

之後,該 stateHandle 對應的分布式檔案的引用計數為 0,如果我們這個時候删除分布式檔案,則會同時删除

5.sst

對應的内容,導緻後續無法從 chk-3 恢複。

這裡的問題是如何在

stateHandle

對應的分布式檔案引用計數降為 0 的時候正确判斷是否還會繼續引用該檔案,是以在整個 checkpoint 完成處理之後再判斷某個分布式檔案能否删除,如果真個 checkpoint 完成發現檔案沒有被引用,則可以安全删除,否則不進行删除。

2.3 降低空間放大

使用小檔案合并方案後,每個 sst 檔案對應分布式檔案中的一個 segment,如下圖所示

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

檔案僅能在所有 segment 都不再使用時進行删除,上圖中有 4 個 segment,僅 segment-4 被使用,但是整個檔案都不能删除,其中 segment[1-3] 的空間被浪費掉了,從實際生産環境中的資料可知,整體的空間放大率(實際占用的空間 / 真實有用的空間)在 1.3 - 1.6 之間。

為了解決空間放大的問題,在 TM 端起異步線程對放大率超過門檻值的檔案進行壓縮。而且僅對已經關閉的檔案進行壓縮。

整個壓縮的流程如下所示:

  1. 計算每個檔案的放大率
  2. 如果放大率較小則直接跳到步驟 7
  3. 如果檔案 A 的放大率超過門檻值,則生成一個對應的新檔案 A‘(如果這個過程中建立檔案失敗,則由 TM 負責清理工作)
  4. 記錄 A 與 A’ 的映射關系
  5. 在下一次 checkpoint X 往 JM 發送落在檔案 A 中的 StateHandle 時,則使用 A` 中的資訊生成一個新的 StateHandle 發送給 JM
  6. checkpoint X 完成後,我們增加 A‘ 的引用計數,減少 A 的引用計數,在引用計數降為 0 後将檔案 A 删除(如果 JM 增加了 A’ 的引用,然後出現異常,則會從上次成功的 checkpoint 重新建構整個引用計數器)
  7. 檔案壓縮完成

2.4 異常情況處理

在 checkpoint 的過程中,主要有兩種異常:JM 異常和 TM 異常,我們将分情況闡述。

2.4.1 JM 異常

JM 端主要記錄 StateHandle 以及檔案的引用計數,引用計數相關資料不需要持久化到外存中,是以不需要特殊的處理,也不需要考慮 transaction 等相關操作,如果 JM 發送 failover,則可以直接從最近一次 complete checkpoint 恢複,并重建引用計數即可。

2.4.2 TM 異常

TM 異常可以分為兩種:1)該檔案在之前 checkpoint 中已經彙報過給 JM;2)檔案尚未彙報過給 JM,我們會分情況闡述。

  1. 檔案已經彙報過給 JM

    檔案彙報過給 JM,是以在 JM 端有檔案的引用計數,檔案的删除由 JM 控制,當檔案的引用計數變為 0 之後,JM 将删除該檔案。

  2. 檔案尚未彙報給 JM

    該檔案暫時尚未彙報過給 JM,該檔案不再被使用,也不會被 JM 感覺,成為孤兒檔案。這種情況暫時有外圍工具統一進行清理。

2.5 取消 TM 端 snapshot

像前面章節所說,我們需要在 checkpoint 逾時/失敗時,取消 TM 端的 snapshot,而 Flink 則沒有相應的通知機制,現在 FLINK-8871[5] 在追蹤相應的優化,我們在内部增加了相關實作,當 checkpoint 失敗時會發送 RPC 資料給 TM,TM 端接受到相應的 RPC 消息後,會取消相應的 snapshot。

3 相容性

小檔案合并功能支援從之前的版本無縫遷移過來。從之前的 checkpoint restore 的的步驟如下:

  1. 每個 TM 分到自己需要 restore 的 state handle
  2. TM 從遠端下載下傳 state handle 對應的資料
  3. 從本地進行恢複

小檔案合并主要影響的是第 2 步,從遠端下載下傳對應資料的時候對不同的 StateHandle 進行适配,是以不影響整體的相容性。

4 優勢和不足

  • 優勢:大幅度降低 HDFS 的壓力:包括 RPC 壓力以及 NameNode 記憶體的壓力
  • 不足:不支援 State 多線程上傳的功能(State 上傳暫時不是 checkpoint 的瓶頸)

5 線上環境的結果

在該方案上線後,對 Namenode 的壓力大幅降低,下面的截圖來自線上生産叢集,從資料來看,檔案建立和關閉的 RPC 有明顯下降,RPC 的響應時間也有大幅度降低,確定順利度過雙十一。

阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻
阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻
阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻
阿裡巴巴大規模應用Flink的踩坑經驗:如何大幅降低 HDFS 壓力?背景1 小檔案合并方案2 設計實作3 相容性4 優勢和不足5 線上環境的結果參考文獻

參考文獻

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html

[2]

https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

[3]

https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale

[4]

https://issues.apache.org/jira/browse/FLINK-11937

[5]

https://issues.apache.org/jira/browse/FLINK-8871

繼續閱讀