天天看點

Flink的非Barrier對齊可以優化高反壓

問題導讀

1.Barrier 對齊會造成什麼問題? 

目前的 Checkpoint 算法在大多數情況下運作良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導緻作業的不穩定。

2.Barrier 對齊是否會造成反壓?

3.如何了解Unaligned Checkpoint ?

作為 Flink 最基礎也是最關鍵的容錯機制,Checkpoint 快照機制很好地保證了 Flink 應用從異常狀态恢複後的資料準确性。同時 Checkpoint 相關的 metrics 也是診斷 Flink 應用健康狀态最為重要的名額,成功且耗時較短的 Checkpoint 表明作業運作狀況良好,沒有異常或反壓。然而,由于 Checkpoint 與反壓的耦合,反壓反過來也會作用于 Checkpoint,導緻 Checkpoint 的種種問題。針對于此,Flink 在 1.11 引入 Unaligned Checkpint 來解耦 Checkpoint 機制與反壓機制,優化高反壓情況下的 Checkpoint 表現。

目前 Checkpoint 機制簡述

相信不少讀者對 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已經比較熟悉,該節簡單回顧下算法的基礎邏輯,熟悉算法的讀者可放心跳過。

Chandy-Lamport 算法将分布式系統抽象成 DAG(暫時不考慮有閉環的圖),節點表示程序,邊表示兩個程序間通信的管道。分布式快照的目的是記錄下整個系統的狀态,即可以分為節點的狀态(程序的狀态)和邊的狀态(信道的狀态,即傳輸中的資料)。因為系統狀态是由輸入的消息序列驅動變化的,我們可以将輸入的消息序列分為多個較短的子序列,圖的每個節點或邊先後處理完某個子序列後,都會進入同一個穩定的全局統狀态。利用這個特性,系統的程序和信道在子序列的邊界點分别進行本地快照,即使各部分的快照時間點不同,最終也可以組合成一個有意義的全局快照。

Flink的非Barrier對齊可以優化高反壓

圖1. Checkpoint Barrier

從實作上看,Flink 通過在 DAG 資料源定時向資料流注入名為 Barrier 的特殊元素,将連續的資料流切分為多個有限序列,對應多個 Checkpoint 周期。每當接收到 Barrier,算子進行本地的 Checkpoint 快照,并在完成後異步上傳本地快照,同時将 Barrier 以廣播方式發送至下遊。當某個 Checkpoint 的所有 Barrier 到達 DAG 末端且所有算子完成快照,則标志着全局快照的成功。

圖2. Barrier Alignment

在有多個輸入 Channel 的情況下,為了資料準确性,算子會等待所有流的 Barrier 都到達之後才會開始本地的快照,這種機制被稱為 Barrier 對齊。在對齊的過程中,算子隻會繼續處理的來自未出現 Barrier Channel 的資料,而其餘 Channel 的資料會被寫入輸入隊列,直至在隊列滿後被阻塞。當所有 Barrier 到達後,算子進行本地快照,輸出 Barrier 到下遊并恢複正常處理。

比起其他分布式快照,該算法的優勢在于輔以 Copy-On-Write 技術的情況下不需要 “Stop The World” 影響應用吞吐量,同時基本不用持久化進行中的資料,隻用儲存程序的狀态資訊,大大減小了快照的大小。

Checkpoint 與反壓的耦合

目前的 Checkpoint 算法在大多數情況下運作良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導緻作業的不穩定。

首先, Chandy-Lamport 分布式快照的結束依賴于 Marker 的流動,而反壓則會限制 Marker 的流動,導緻快照的完成時間變長甚至逾時。無論是哪種情況,都會導緻 Checkpoint 的時間點落後于實際資料流較多。這時作業的計算進度是沒有被持久化的,處于一個比較脆弱的狀态,如果作業出于異常被動重新開機或者被使用者主動重新開機,作業會復原丢失一定的進度。如果 Checkpoint 連續逾時且沒有很好的監控,復原丢失的進度可能高達一天以上,對于實時業務這通常是不可接受的。更糟糕的是,復原後的作業落後的 Lag 更大,通常帶來更大的反壓,形成一個惡性循環。

其次,Barrier 對齊本身可能成為一個反壓的源頭,影響上遊算子的效率,而這在某些情況下是不必要的。比如典型的情況是一個的作業讀取多個 Source,分别進行不同的聚合計算,然後将計算完的結果分别寫入不同的 Sink。通常來說,這些不同的 Sink 會複用公共的算子以減少重複計算,但并不希望不同 Source 間互相影響。

Flink的非Barrier對齊可以優化高反壓

圖3. Barrier Alignment 阻塞上遊 Task

假設一個作業要分别統計 A 和 B 兩個業務線的以天為粒度名額,同時還需要統計所有業務線以周為機關的名額,拓撲如上圖所示。如果 B 業務線某天的業務量突漲,使得 Checkpoint Barrier 有延遲,那麼會導緻公用的 Window Aggregate 進行 Barrier 對齊,進而阻塞業務 A 的 FlatMap,最終令業務 A 的計算也出現延遲。

當然這種情況可以通過拆分作業等方式優化,但難免引入更多開發維護成本,而且更重要的是這本來就符合 Flink 使用者正常的開發思路,應該在架構内盡量減小出現使用者意料之外的行為的可能性。

Unaligned Checkpoint

為了解決這個問題,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要了解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 論文中對于 Marker 處理規則的描述:

Flink的非Barrier對齊可以優化高反壓

圖4. Chandy-Lamport Marker 處理

其中關鍵是 if q has not recorded its state,也就是接收到 Marker 時算子是否已經進行過本地快照。一直以來 Flink 的 Aligned Checkpoint 通過 Barrier 對齊,将本地快照延遲至所有 Barrier 到達,因而這個條件是永真的,進而巧妙地避免了對算子輸入隊列的狀态進行快照,但代價是比較不可控的 Checkpoint 時長和吞吐量的降低。實際上這和 Chandy-Lamport 算法是有一定出入的。

舉個例子,假設我們對兩個資料流進行 equal-join,輸出比對上的元素。按照 Flink Aligned Checkpoint 的方式,系統的狀态變化如下(圖中不同顔色的元素代表屬于不同的 Checkpoint 周期):

圖5. Aligned Checkpoint 狀态變化

圖 a: 輸入 Channel 1 存在 3 個元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個元素,其中 2、9、7 在 Barrier 前面。

圖 b: 算子分别讀取 Channel 一個元素,輸出 2。随後接收到 Channel 1 的 Barrier,停止處理 Channel 1 後續的資料,隻處理 Channel 2 的資料。

圖 c: 算子再消費 2 個自 Channel 2 的元素,接收到 Barrier,開始本地快照并輸出 Barrier。

對于相同的情況,Chandy-Lamport 算法的狀态變化如下:

圖6. Chandy-Lamport 狀态變化

圖 a: 同上。

圖 b: 算子分别處理兩個 Channel 一個元素,輸出結果 2。此後接收到 Channel 1 的 Barrier,算子開始本地快照記錄自己的狀态,并輸出 Barrier。

圖 c: 算子繼續正常處理兩個 Channel 的輸入,輸出 9。特别的地方是 Channel 2 後續元素會被儲存下來,直到 Channel 2 的 Barrier 出現(即 Channel 2 的 9 和 7)。儲存的資料會作為 Channel 的狀态成為快照的一部分。

兩者的差異主要可以總結為兩點:

快照的觸發是在接收到第一個 Barrier 時還是在接收到最後一個 Barrier 時。

是否需要阻塞已經接收到 Barrier 的 Channel 的計算。

從這兩點來看,新的 Unaligned Checkpoint 将快照的觸發改為第一個 Barrier 且取消阻塞 Channel 的計算,算法上與 Chandy-Lamport 基本一緻,同時在實作細節方面結合 Flink 的定位做了幾個改進。

首先,不同于 Chandy-Lamport 模型的隻需要考慮算子輸入 Channel 的狀态,Flink 的算子有輸入和輸出兩種 Channel,在快照時兩者的狀态都需要被考慮。

其次,無論在 Chandy-Lamport 還是 Flink Aligned Checkpoint 算法中,Barrier 都必須遵循其在資料流中的位置,算子需要等待 Barrier 被實際處理才開始快照。而 Unaligned Checkpoint 改變了這個設定,允許算子優先攝入并優先輸出 Barrier。如此一來,第一個到達 Barrier 會在算子的緩存資料隊列(包括輸入 Channel 和輸出 Channel)中往前跳躍一段距離,而被”插隊”的資料和其他輸入 Channel 在其 Barrier 之前的資料會被寫入快照中(圖中黃色部分)。

Flink的非Barrier對齊可以優化高反壓

圖7. Barrier 越過資料

這樣的主要好處是,如果本身算子的處理就是瓶頸,Chandy-Lamport 的 Barrier 仍會被阻塞,但 Unaligned Checkpoint 則可以在 Barrier 進入輸入 Channel 就馬上開始快照。這可以從很大程度上加快 Barrier 流經整個 DAG 的速度,進而降低 Checkpoint 整體時長。

回到之前的例子,用 Unaligned Checkpoint 來實作,狀态變化如下:

圖8. Unaligned-Checkpoint 狀态變化

圖 a: 輸入 Channel 1 存在 3 個元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個元素,其中 2、9、7 在 Barrier 前面。輸出 Channel 已存在結果資料 1。

圖 b: 算子優先處理輸入 Channel 1 的 Barrier,開始本地快照記錄自己的狀态,并将 Barrier 插到輸出 Channel 末端。

圖 c: 算子繼續正常處理兩個 Channel 的輸入,輸出 2、9。同時算子會将 Barrier 越過的資料(即輸入 Channel 1 的 2 和輸出 Channel 的 1)寫入 Checkpoint,并将輸入 Channel 2 後續早于 Barrier 的資料(即 2、9、7)持續寫入 Checkpoint。

比起 Aligned Checkpoint 中不同 Checkpoint 周期的資料以算子快照為界限分隔得很清晰,Unaligned Checkpoint 進行快照和輸出 Barrier 時,部分本屬于目前 Checkpoint 的輸入資料還未計算(是以未反映到目前算子狀态中),而部分屬于目前 Checkpoint 的輸出資料卻落到 Barrier 之後(是以未反映到下遊算子的狀态中)。這也正是 Unaligned 的含義: 不同 Checkpoint 周期的資料沒有對齊,包括不同輸入 Channel 之間的不對齊,以及輸入和輸出間的不對齊。而這部分不對齊的資料會被快照記錄下來,以在恢複狀态時重放。換句話說,從 Checkpoint 恢複時,不對齊的資料并不能由 Source 端重放的資料計算得出,同時也沒有反映到算子狀态中,但因為它們會被 Checkpoint 恢複到對應 Channel 中,是以依然能提供隻計算一次的準确結果。

當然,Unaligned Checkpoint 并不是百分百優于 Aligned Checkpoint,它會帶來的已知問題就有:

由于要持久化緩存資料,State Size 會有比較大的增長,磁盤負載會加重。

随着 State Size 增長,作業恢複時間可能增長,運維管理難度增加。

目前看來,Unaligned Checkpoint 更适合容易産生高反壓同時又比較重要的複雜作業。對于像資料 ETL 同步等簡單作業,更輕量級的 Aligned Checkpoint 顯然是更好的選擇。

總結

Flink 1.11 的 Unaligned Checkpoint 主要解決在高反壓情況下作業難以完成 Checkpoint 的問題,同時它以磁盤資源為代價,避免了 Checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源使用率。随着流計算的普及,未來的 Flink 應用大概會越來越複雜,在未來經過實戰打磨完善後 Unaligned Checkpoint 很有可能會取代 Aligned Checkpoint 成為 Flink 的預設 Checkpoint 政策。

轉載:https://www.sohu.com/a/407428726_797717

繼續閱讀