開發者學堂課程【Apache Flink 2021 最新入門課程:Fault-tolerance in Flink】學習筆記,與課程緊密聯系,讓使用者快速學習知識。
課程位址:
https://developer.aliyun.com/learning/course/58/detail/1071Fault-tolerance in Flink
内容介紹:
一、有狀态的流計算
二、全局一緻性快照
三、Flink的容錯機制
四、Flink的狀态管理
1.流計算
流計算是指有一個資料源可以持續不斷地發送消息,同時有一個常駐程式運作代碼,從資料源拿到一個消息後會進行處理,然後把結果輸出到下遊。
2.分布式流計算
分布式流計算是指把輸入流以某種方式進行一個劃分,再使用多個分布式執行個體對流進行處理。
3.流計算中的狀态
計算可以分成有狀态和無狀态兩種,無狀态的計算隻需要處理單一事件,有狀态的計算需要記錄并處理多個事件。
舉個簡單的例子:
例如一個事件由事件ID和事件值兩部分組成,如果處理邏輯是每拿到一個事件,都解析并輸出它的事件值,那麼這就是一個無狀态的計算;相反,如果每拿到一個狀态,解析它的值出來後,需要和前一個事件值進行比較,比前一個事件值大的時候才把它進行輸出,這就是一個有狀态的計算。
流計算中的狀态有很多種。比如在去重的場景下,會記錄所有的主鍵;又或者在視窗計算裡,已經進入視窗還沒觸發的資料,這也是流計算的狀态。
在機器學習/深度學習場景裡,訓練的模型及參數資料都是流計算的狀态。
全局一緻性快照是可以用來給分布式系統做備份和故障恢複的機制。
1.全局快照
什麼是全局快照
全局快照首先是一個分布式應用,它有多個程序分布在多個伺服器上;
其次,它在應用内部有自己的處理邏輯和狀态;
第三,應用間是可以互相通信的;
第四,在這種分布式的應用,有内部狀态,硬體可以通信的情況下,某一時刻的全局狀态,就叫做全局的快照。
為什麼需要全局快照
第一,用它來做檢查點,可以定期對全局狀态做備份,當應用程式故障時,就可以拿來恢複;
第二,做死鎖檢測,進行快照後目前的程式繼續運作,然後可以對快照進行分析,看應用程式是不是存在死鎖狀态,如果是就可以進行相應的處理。
全局快照舉例
下圖為分布式系統中全局快照的示例。
P1和P2是兩個程序,它們之間有消息發送的管道,分别是C12和C21。
對于 P1程序來說, C12是它發送消息的管道,稱作output channel; C21是它接收消息的管道,稱作 input channel。
除了管道,每個程序都有一個本地的狀态。
比如說P1和P2每個程序的記憶體裡都有XYZ三個變量和相應的值。
那麼 P1和P2程序的本地狀态和它們之間發送消息的管道狀态,就是一個初始的全局狀态,也可稱為全局快照。
假設P1給P2發了一條消息,讓P2把x的狀态值從4改為7,但是這個消息在管道中,還沒到達P2。
這個狀态也是一個全局快照。
再接下來,P2收到了P1的消息,但是還沒有處理,這個狀态也是一個全局快照。
最後接到消息的P2把本地的X的值從4改為7,這也是一個全局快照。
是以當有事件發生的時候,全局的狀态就會發生改變。事件包括程序發送消息、程序接收消息和程序修改自己的狀态。
2.全局一緻性快照
當事件發生時,全局的狀态會發生改變,這裡的事件包括:
-程序發送消息
-程序接收到消息
-程序修改狀态
a->b代表在絕對時鐘(real time)下a happened before b,則當一個全局快照滿足下述條件時,我們稱其為一個全局一緻性快照︰
-如果A->B且B被包含在該快照中,則A也被包含在這個快照中
假如說有兩個事件,a和b,在絕對時間下,如果a發生在b之前,且b被包含在快照當中,那麼則a也被包含在快照當中。
滿足這個條件的全局快照,就稱為全局一緻性快照。
3.全局一緻性快照的實作方法
時鐘同步并不能實作全局一緻性快照;全局同步雖然可以實作,但是它的缺點也非常明顯,它會讓所有應用程式都停下來,會影響全局的性能。
4.異步全局一緻性快照算法 – Chandy-Lamport
異步全局一緻性快照算法Chandy-Lamport可以在不影響應用程式運作的前提下,實作全局一緻性快照。
Chandy-Lamport的系統要求有以下幾點:
第一,不影響應用運作,也就是不影響收發消息,不需要停止應用程式;
第二,每個程序都可以記錄本地狀态;
第三,可以分布式地對已記錄的狀态進行收集;
第四,任意程序都可以發起快照
同時,Chandy-Lamport算法可以執行還有一個前提條件:消息有序且不重複,并且消息可靠性可保障。
Chandy-Lamport算法流程
Chandy-Lamport的算法流程主要分為三個部分:
發起快照、分布式的執行快照和終止快照 。
發起快照
任意程序都可以發起快照。
如下圖所示,當由P1發起快照的時候,第一步需要記錄本地的狀态,也就是對本地進行快照,然後立刻向它所有 output channel發送一個marker消息,這中間是沒有時間間隙的。
marker消息是一個特殊的消息,它不同于應用之間傳遞的消息。
發出Marker消息後,P1就會開始記錄所有input channel的消息,也就是圖示C21管道的消息。
分布式的執行快照
如下圖,先假定當 Pi接收到來自Cki的marker消息。
也就是Pk發給Pi的marker消息。可以分兩種情況來看:
第一種情況:
這個是Pi收到的第一個來自其它管道的marker消息,它會先記錄一下本地的狀态,再把 C12管道記為空,也就是說後續再從 P1發消息,就不包含在此次快照裡了,與此同時立刻向它所有output channel發送marker消息。
最後開始記錄來自除Cki之外的所有input channel的消息。
上面提到Cki消息不包含在實時快照裡,但是實時消息還是會發生,是以第二種情況是,如果此前Pi已經接收過marker消息,它會停止記錄 Cki消息,同時會将此前記錄的所有Cki消息作為Cki在本次快照中的最終狀态來儲存。
終止快照
終止快照的條件有兩個:
第一,所有程序都已經接收到marker消息,并記錄在本地快照;
第二,所有程序都從它的n-1個input channel裡收到了marker 消息,并記錄了管道狀态。
當快照終止,快照收集器 (Central Server) 就開始收集每一個部分的快照去形成全局一緻性快照了。
示例展示
在下圖的例子裡,一些狀态是在内部發生的,比如A,它跟其它程序沒有互動。内部狀态就是 P1發給自己消息,可以将A認為是C11=[A->]。
Chandy-Lamport全局一緻性快照的算法是怎麼執行的呢?
假設從p1來發起快照,它發起快照時,首先對本地的狀态進行快照,稱之為S1,然後立刻向它所有的output channel,即P2和P3,分别發marker消息,然後再去記錄它所有input channel的消息,即來自P2和P3及自身的消息。
圖例所示,縱軸是絕對時間,按照絕對時間來看,為什麼P3和P2收到marker消息會有時間差呢?
因為假如這是一個真實的實體環境裡的分布式程序,不同節點之間的網絡狀況是不一樣的,這種情況會導緻消息送達時間存在差異。
P3先收到marker消息,且是它接收到的第一個marker消息。
接收到消息後,它首先會對本地狀态進行快照,然後把 C13管道的标記成 close,與此同時開始向它所有的output channel發送 marker消息。
最後它會把來自除了C13之外的所有input channel的消息開始進行記錄。
接收到P3發出的marker資訊的是P1,但這不是它接收的第一個marker,它會把來自C31 channel的管道立刻關閉,并且把目前的記錄消息做這個channel的快照,後續再接收到來自P3的消息,就不會更新在此次的快照狀态裡了。
接下來P2接收到來自P3的消息,這是它接到的第一個marker消息。
接收到消息後,它首先對本地狀态進行快照,然後把 C32管道的标記成 close,與此同時開始向它所有的output channel發送 marker消息,最後它會把來自除了C32之外的所有input channel的消息開始進行記錄。
再來看P2接收到來自P1的消息,這不是P2接收到的第一個marker消息,是以它會把所有的 input channel全部關閉,并且記錄channel的狀态。
接下來看P1接收到來自P2的消息,這也不是它接收的第一個消息。那麼它就會把所有的input channel關閉,并把記錄的消息作為狀态。那麼這裡面有兩個狀态,一個是C11,即自己發給自己的消息;一個是C21,是P2裡H發給P1D的。
最後一個時間點,P3接收到來自P2的消息,這也不是它收到的第一個消息,操作跟上面介紹的一樣。
在這期間P3本地有一個事件J,它也會把J作為它的狀态。
當所有程序都記錄了本地狀态,而且每一個程序的所有輸入管道都已經關閉了,那麼全局一緻性快照就結束了,也就是對過去時間點的全局性的狀态記錄完成了。
Chandy-Lamport與 Flink之間的關系
Flink 是分布式系統,是以 Flink 會采用全局一緻性快照的方式形成檢查點,來支援故障恢複。
Flink的異步全局一緻性快照算法跟Chandy-Lamport算法的差別主要有以下幾點:
第一,Chandy-Lamput支援強連通圖,而 Flink支援弱連通圖;
第二,Flink采用的是裁剪的(Tailored)Chandy-Lamput異步快照算法;
第三,Flink的異步快照算法在DAG場景下不需要存儲Channel state,進而極大減少快照的存儲空間。
容錯,就是恢複到出錯前的狀态。流計算容錯一緻性保證有三種,分别是:
Exactly once,At least once,At most once。
1.Exactly once,是指每條event會且隻會對state産生一次影響,這裡的“一次”并非端到端的嚴格一次,而是指在 Flink内部隻處理一次,不包括source和sink的處理。
2.At least once,是指每條event會對state産生最少一次影響,也就是存在重複處理的可能。
3.At most once,是指每條event會對state産生最多一次影響,就是狀态可能會在出錯時丢失。
端到端的Exactly once
Exactly once的意思是,作業結果總是正确的,但是很可能産出多次;是以它的要求是需要有可重放的source。
端到端的Exactly once,是指作業結果正确且隻會被産出一次,它的要求除了有可重放的source外,還要求有事務型的sink和可以接收幂等的産出結果。
Flink的狀态容錯
簡單場景的 Exactly Once 容錯方法
簡單場景的做法如下圖,方法就是,記錄本地狀态并且把 source的offset,即 Event log的位置記錄下來就好了。
分布式場景的狀态容錯
如果是分布式場景,我們需要在不中斷運算的前提下對多個擁有本地狀态的算子産生全局一緻性快照。
Flink 分布式場景的作業拓撲比較特殊,它是有向無環并且是弱聯通圖,可以采用裁剪的Chandy-Lamport,也就是隻記錄所有輸入的offset和各個算子狀态,并依賴rewindable source(可回溯的source,即可以通過offset讀取比較早一點時間點),進而不需要存儲channel的狀态,這在存在聚合 (aggregation)邏輯的情況下可以節省大量的存儲空間。
最後做恢複,恢複就是把資料源的位置重新設定,然後每一個算子都從檢查點恢複狀态。
3.Flink 的分布式快照方法
首先在源資料流裡插入Checkpoint barrier,也就是上文提到的Chandy-Lamport算法裡的marker message,不同的Checkpoint barrier會把流自然地切分多個段,每個段都包含了Checkpoint的資料;
Flink 裡有一個全局的 Coordinator,它不像 Chandy-Lamport 對任意一個程序都可以發起快照,這個集中式的 Coordinator會把 Checkpoint barrier 注入到每個 source 裡,然後啟動快照。
當每個節點收到 barrier 後,因為 Flink 裡面它不存儲 Channel state,是以它隻需存儲本地的狀态就好。
在做完了Checkpoint 後,每個算子的每個并發都會向Coordinator發送一個确認消息,當所有任務的确認消息都被Checkpoint Coordinator接收,快照就結束了。
4.流程示範
見下圖示,假設Checkpoint N 被注入到 source裡,這時source會先把它正在處理分區的offset記錄下來。
随着時間的流逝,它會把Checkpoint barrier發送到兩個并發的下遊,當barrier分别到達兩個并發,這兩個并發會分别把它們本地的狀态都記錄在Checkpoint 的裡:
最後barrier到達最終的subtask,快照就完成了。
這是比較簡單的場景示範,每個算子隻有單流的輸入,再來看下圖比較複雜的場景,算子有多流輸入的情況。
當算子有多個輸入,需要把Barrier 對齊。怎麼把Barrier對齊呢?
如下圖所示,在左側原本的狀态下,當其中一條barrier到達,另一條barrier指令上有的barrier還在管道中沒有到達,這時會在保證Exactly once的情況下,把先到達的流直接阻塞掉,然後等待另一條流的資料處理。等到另外一條流也到達了,會把之前的流unblock,同時把barrier發送到算子。
在這個過程中,阻塞掉其中一條流的作用是,會讓它産生反壓。Barrier 對齊會導緻反壓和暫停operator的資料處理。
如果不在對齊過程中阻塞已收到barrier的資料管道,資料持續不斷流進來,那麼屬于下個Checkpoint的資料被包含在目前的Checkpoint裡,如果一旦發生故障恢複後,由于source會被rewind,部分資料會有重複處理,這就是at-least-once。 如果能接收at-least-once,那麼可以選擇其他可以避免barrier對齊帶來的副作用。
另外也可以通過異步快照來盡量減少任務停頓并支援多個Checkpoint同時進行。
5.快照觸發
本地快照同步上傳到系統需要state Copy-on-write的機制。
假如對中繼資料資訊做了快照之後資料處理恢複了,在上傳資料的過程中如何保證恢複的應用程式邏輯不會修改正在上傳的資料呢?
實際上不同狀态存儲後端的處理是不一樣的,Heap backend會觸發資料的copy-on-write,而對于RocksDB backend來說LSM的特性可以保證已經快照的資料不會被修改。
四、Flink 的狀态管理
1.Flink 狀态管理
首先需要去定義一個狀态,在下圖的例子裡,先定義一個Value state。
在定義的狀态的時候,需要給出以下的幾個資訊:
1.狀态識别ID
2.狀态資料類型
3.本地狀态後端注冊狀态
4.本地狀态後端讀寫狀态
2.Flink 狀态後端
也稱作state backend,Flink狀态後端有兩種;
第一種,JVM Heap,它裡面的資料是以Java對象形式存在的,讀寫也是以對象形式去完成的,是以速度很快。
但是也存在兩個弊端:第一個弊端,以對象方式存儲所需的空間是磁盤上序列化壓縮後的資料大小的很多倍,是以占用的記憶體空間很大;第二個弊端,雖然讀寫不用做序列化,但是在形成snapshot時需要做序列化,是以它的異步snapshot過程會比較慢。
第二種,RocksDB,這個類型在讀寫時就需要做序列化,是以它讀寫的速度比較慢。
但是它有一個好處,基于 LSM 的資料結構在快照之後會形成sst檔案,它的異步 checkpoint 過程就是檔案拷貝的過程,CPU 消耗會比較低。