天天看點

Flink的容錯

一、State 狀态 (狀态資料)

Flink 實時計算程式為了保證計算過程中,出現異常可以容錯,就要将中間的計算結果資料存儲起來,這些中間結果資料就叫做 State。

State 可以是多種類型的,預設是儲存到 JobManager 的記憶體中,也可以儲存到TaskManager 本地檔案系統或HDFS這樣的分布式檔案系統。

二、StateBackEnd(靠譜的存儲系統中)

用來儲存State 的存儲後端就叫做StateBackEnd,預設是儲存在JobManager 的記憶體中,也可以儲存到本地檔案系統或HDFS這樣的分布式檔案系統中。

三、CheckPointing(周期性的操作)

Flink 實時計算為了容錯,可以将中間資料定期儲存起來,這種定期觸發儲存中間結果的機制叫CheckPoint,CheckPoint 是周期執行的,具體的過程是 JobManager定期的

向TaskManager中的SubTask 發送RPC消息,SubTask将其計算的State儲存到 StateBackEnd中,并且向JobManager響應CheckPoint是否成功,如果程式出現異常或重新開機,

TaskManager 中的SubTask可以從上一次成功的CheckPointing 的 State 恢複。

總結:JobManager 需要周期性的讓TaskManager中的 SubTask中的State 儲存到StateBackEnd 中。

四、重新開機政策

Flink 實時計算程式,為了容錯,需要開啟CheckPointing ,一旦開啟CheckPoint,如果沒有重新開機政策,預設的重新開機政策是無限重新開機,也可以設定其他重新開機政策,

如:重新開機固定次數且可以延遲執行的政策。

五、state存儲方式,這幾種方式有什麼異同?

5.1.MemoryStateBackend

MemoryStateBackend将工作狀态資料儲存在taskManager的java記憶體中。key/value狀态和window算子使用哈希表存儲數值和觸發器。進行快照時(checkpointing),

生成得快照資料将和checkpoint ACK消息一起發送給 jobmanager,jobmanager将收到的所有快照儲存在java記憶體中。

MemoryStateBackend現在被預設配置成異步的,這樣避免阻塞主線程的pipline處理。

MemoryStateBackend的狀态存取的速度都非常快,但是又不适合在生産環境中使用。這是因為MemoryStateBackend有以下限制:

1、每個state的預設大小被限制為5MB(這個值可以通過MemoryStateBackend構造函數設定)

2、每個task的所有state資料(一個task可能包含一個pipline中的多個Operator)大小不能超過RPC系統的幀大小(akka.framesize,預設10MB)

3、jobmanager收到的state資料總和不能超過jobmananger記憶體。

MemoryStateBackend适合的場景:

本地開發和調試

狀态很小的作業

值得說明的是,當觸發savepoint時,jobmanager會把快照資料持久化的外部存儲。

5.2、FsStateBackend

FsStateBackend需要配置一個 checkpoint路徑,例如: "hdfs://namenode:40010/fink/checkpoints"或者"file:///data/flink/checkpoint" 

我們一般配置為hdfs目錄

FsStateBackend将工作狀态資料儲存在taskmanager的java記憶體中。進行快照時,再将快照資料寫入上面的路徑,然後将寫入的檔案路徑告知jobmanager。

jobmanager中儲存所有狀态的中繼資料資訊(在HA模式下,中繼資料會寫入checkpoint目錄)

FsStateBackend預設使用異步方式進行快照,防止阻塞主線程的pipline處理。可以通過FsStateBackend構造函數取消該模式

new FsStateBackEnd(path,false)

FsStateBackend适合的場景:

大狀态,長視窗,大鍵值(鍵或者值很大)狀态的作業

适合高可用方案

5.3、RocksDBStateBackend

需要配置一個 checkpoint路徑,例如: "hdfs://namenode:40010/fink/checkpoints"或者"file:///data/flink/checkpoint" 

RocksDB是一種嵌入的持久性的key-value存儲引擎,提供ACID支援。由Facebook基于levelDB開發,使用LSM存儲引擎,是記憶體和磁盤混合存儲。

RocksDBStateBackend 将工作狀态儲存在 taskmanager的RocksDB資料庫中,checkpoint時,RocksDB中的所有資料會被傳輸到配置的檔案目錄,

少量中繼資料資訊儲存在jobmanager記憶體中(HA模式下,會儲存在checkpoint目錄)

RocksDBStateBackend使用異步方式進行快照

RocksDBStateBackend的限制:

RocksDBStateBackend 适用于以下場景:

  • 超大狀态、超長視窗(天)、大鍵值狀态的作業
  • 适合高可用模式

使用 RocksDBStateBackend 時,能夠限制狀态大小的是 taskmanager 磁盤空間(相對于 FsStateBackend 狀态大小限制于 taskmanager 記憶體 )。

這也導緻 RocksDBStateBackend 的吞吐比其他兩個要低一些。因為 RocksDB 的狀态資料的讀寫都要經過反序列化/序列化。

RocksDBStateBackend 是目前三者中唯一支援增量 checkpoint 的。