天天看點

Flink 學習(一):容錯機制IntroduceCheckpointingBarriersStateExactly Once vs. At Least OnceAsynchronous State SnapshotsRecoveryOperator Snapshot Implementation 

Introduce

Apache Flink 提供了可以恢複資料流應用到一緻狀态的容錯機制。確定在發生故障時,程式恢複時,資料流的每一條記錄隻會被處理一次(exactly-once),當然也可以降級為至少處理一次(at-least-once)。

容錯機制通過持續建立分布式資料流的快照來實作。對于狀态占用空間小的流應用,這些快照非常輕量,可以高頻率建立而對性能影響很小。流計算應用的狀态儲存在一個可配置的環境,如:master 節點或者 HDFS上。

在遇到程式故障時(如機器、網絡、軟體等故障),Flink 停止分布式資料流。系統重新開機所有 operator ,重置其到最近成功的 checkpoint。輸入重置到相應的狀态快照位置。保證被重新開機的并行資料流中處理的任何一個 record 都不是 checkpoint 狀态之前的一部分。

注意:為了容錯機制生效,資料源(例如 queue 或者 broker)需要能重放資料流。Apache Kafka 有這個特性,Flink 中 Kafka 的 connector 利用了這個功能。

注意:由于 Flink 的 checkpoint 是通過分布式快照實作的,接下來我們将 snapshot 和 checkpoint 這兩個詞交替使用。

Checkpointing

Flink 容錯機制的核心就是持續建立分布式資料流及其狀态的一緻快照。這些快照在系統遇到故障時,充當可以回退的一緻性檢查點(checkpoint)。Lightweight Asynchronous Snapshots for Distributed Dataflows 描述了Flink 建立快照的機制。此論文是受分布式快照算法 Chandy-Lamport 啟發,并針對 Flink 執行模型量身定制。

Barriers

1.Flink 分布式快照的核心概念之一就是資料栅欄(barrier)。這些 barrier 被插入到資料流中,作為資料流的一部分和資料一起向下流動。

2.每一個barrier都帶有快照ID,barrier之前的資料都會進入此快照,barrier之後的資料則進入下一個快照,即一個 barrier 把資料流分割成兩部分:一部分進入到目前快照,另一部分進入下一個快照。

3.barrier不會幹擾資料流處理,多個快照可以同時建立。

4.Barrier 在資料源端插入,當 snapshot n 的 barrier 插入後,系統會記錄目前 snapshot 位置值 n (用Sn表示)。例如,在 Apache Kafka 中,這個變量表示某個分區中最後一條資料的偏移量。這個位置值 Sn 會被發送到一個稱為 checkpoint coordinator 的子產品。(即 Flink 的 JobManager)。

5.當一個中間Operator接收到barrier後,會發送barrier到屬于該barrier的Snapshot的資料流中。

等Sink Operator接收到該barrier後悔想checkpoit Coordinator确認該Snapshot,知道所有的Snapshot被确認,才算完成快照。

Flink 學習(一):容錯機制IntroduceCheckpointingBarriersStateExactly Once vs. At Least OnceAsynchronous State SnapshotsRecoveryOperator Snapshot Implementation 

align

接受多于1個輸入流的Operator在處理快照的Barrier時,需要對多輸入流進行對齊(align)操作,具體過程如上圖所示:

 1. Operator一旦從輸入流中收到快照n的barrier,它在其他所有的輸入流中都收到快照n的barrier之前,都不能繼續處理新的資料。否則,它将把屬于快照n和快照n+1的資料混起來。

 2. 收到Barrier n的資料流将被暫時擱置起來,從這些資料流中收到的資料将不會被進一步處理,而是放進一個輸入緩存中(input buffer)

 3. 當最後的資料流收到Barrier n,Operator将所有等待的輸出資料發送出去,然後發送Barrier n。

 4.在這之後,Operator将恢複處理輸入流的資料,先處理input buffer中的資料,再處理新接收的資料。

舉例:體育比賽頒獎典禮

在領獎台頒獎看作operator,1 2 3名運動員分别代表不同的輸入流中的barier n資料。

三個barrier n到達的順序為3>2>1

當第一個barrier n(即第3名運動員)到達領獎台時,還沒有頒獎(處理資料),等到其他的barrier n(第2、名運動員)都到達以後,才頒獎(處理資料)

State

operator 包含任何形式的狀态,這些狀态都必須包含在快照中。狀态有很多種形式:

  1. 使用者自定義狀态:由 transformation 函數例如( map() 或者 filter())直接建立或者修改的狀态。使用者自定義狀态可以是:轉換函數中的 Java 對象的一個簡單變量或者函數關聯的 key/value 狀态。參見 State in Streaming Applications
  2. 系統狀态:這種狀态是指作為 operator 計算中一部分緩存資料。典型例子就是: 視窗緩存(window buffers),系統收集視窗對應資料到其中,直到視窗計算和發射。

operator 在收到所有輸入資料流中的 barrier 之後,在發射 barrier 到其輸出流之前對其狀态進行快照。此時,在 barrier 之前的資料對狀态的更新已經完成,不會再依賴 barrier 之前資料。由于快照可能非常大,是以後端存儲系統可配置。預設是存儲到 JobManager 的記憶體中,但是對于生産系統,需要配置成一個可靠的分布式存儲系統(例如 HDFS)。狀态存儲完成後,operator 會确認其 checkpoint 完成,發射出 barrier 到後續輸出流。

快照現在包含了:

  • 對于并行輸入資料源:快照建立時資料流中的位置偏移
  • 對于 operator:存儲在快照中的狀态指針
Flink 學習(一):容錯機制IntroduceCheckpointingBarriersStateExactly Once vs. At Least OnceAsynchronous State SnapshotsRecoveryOperator Snapshot Implementation 
Flink 學習(一):容錯機制IntroduceCheckpointingBarriersStateExactly Once vs. At Least OnceAsynchronous State SnapshotsRecoveryOperator Snapshot Implementation 

Exactly Once vs. At Least Once

       對齊操作可能會對流程式增加延遲。通常,這種額外的延遲在幾毫秒的數量級,但是我們也遇到過延遲顯著增加的異常情況。針對那些需要對所有輸入都保持毫秒級的應用,Flink 提供了在 checkpoint 時關閉對齊的方法。當 operator 接收到一個 barrier 時,就會打一個快照,而不會等待其他 barrier。

跳過對齊操作使得即使在 barrier 到達時,Operator 依然繼續處理輸入。這就是說:operator 在 checkpoint n 建立之前,繼續處理屬于 checkpoint n+1 的資料。是以當異常恢複時,這部分資料就會重複,因為它們被包含在了 checkpoint n 中,同時也會在之後再次被處理。

注意:對齊操作隻會發生在擁有多輸入運算(join)或者多個輸出的 operator(重分區、分流)的場景下。是以,對于自由 map(), flatmap(), fliter() 等的并行操作即使在至少一次的模式中仍然會保證嚴格一次。

Asynchronous State Snapshots

我們注意到上面描述的機制意味着當 operator 向後端存儲快照時,會停止處理輸入的資料。這種同步操作會在每次快照建立時引入延遲。

我們完全可以在存儲快照時,讓 operator 繼續處理資料,讓快照存儲在背景異步運作。為了做到這一點,operator 必須能夠生成一個後續修改不影響之前狀态的狀态對象。例如 RocksDB 中使用的寫時複制( copy-on-write )類型的資料結構。

接收到輸入的 barrier 時,operator 異步快照複制出的狀态。然後立即發射 barrier 到輸出流,繼續正常的流處理。一旦背景異步快照完成,它就會向 checkpoint coordinator(JobManager)确認 checkpoint 完成。現在 checkpoint 完成的充分條件是:所有 sink 接收到了 barrier,所有有狀态 operator 都确認完成了狀态備份(可能會比 sink 接收到 barrier 晚)。

更多狀态快照參見:state backends

Recovery

在這種容錯機制下的錯誤回複很明顯:一旦遇到故障,Flink 選擇最近一個完成的 checkpoint k。系統重新部署整個分布式資料流,重置所有 operator 的狀态到 checkpoint k。資料源被置為從 Sk 位置讀取。例如在 Apache Kafka 中,意味着讓消費者從 Sk 處偏移開始讀取。

如果是增量快照,operator 需要從最新的全量快照回複,然後對此狀态進行一系列增量更新。

Operator Snapshot Implementation

當 operator 快照建立時有兩部分操作:同步操作和異步操作。

operator 和背景狀态以 Java FutureTask 的方式提供它們的快照。這個 task 包含了同步操作已經完成,異步操作還在等待的狀态(state)。異步操作在背景線程中被執行。

完全同步的 operator 傳回一個已經完成的 FutureTask 。如果異步操作需要執行,FutureTask 中的 run() 方法會被調用。

為了釋放流和其他資源的消耗,可以取消這些 task。

檢查點(Checkpoint)

Flink使用stream replay和checkpointing來實作容錯。Checkpoint通過對stream和operator都做快照(snapshot)來記錄狀态,這樣才能夠在保證在流處理系統失敗時能夠正确地恢複資料流處理。Checkpoint是Flink周期性自動做的,支援全量和增量。

儲存點(Savepoint)

Savepoint和Checkpoint類似,是用來儲存程式和Flink Cluster的State(狀态),它和Checkpoint的主要差別有兩點:

1. 手動觸發生成

2. 不會自動過期

可以通過指令行或REST API的方式來建立Savepoint。

原文位址:https://segmentfault.com/a/1190000008129552

繼續閱讀