012-Flink
- checkpoint(容錯)
-
- checkpoint概述
- checkpoint配置
- 恢複資料(容錯)
-
- 重新開機政策概述
- 重新開機政策
- 多checkpoint
- 從checkpoint恢複資料
- savepoint(重量級checkpoint)
checkpoint(容錯)
checkpoint概述
(1)為了保證state的容錯性,Flink需要對state進行checkpoint。
(2)Checkpoint是Flink實作容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀态來生成快照,進而将這些狀态資料定期持久化存儲下來,當Flink程式一旦意外崩潰時,重新運作程式時可以有選擇地從這些快照進行恢複,進而修正因為故障帶來的程式資料異常
(3)Flink的checkpoint機制可以與(stream和state)的持久化存儲互動的前提:
持久化的source,它需要支援在一定時間内重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或檔案系統(比如HDFS,S3,GFS等)
用于state的持久化存儲,例如分布式檔案系統(比如HDFS,S3,GFS等)
生成快照5s 到 恢複快照
checkpoint配置
預設checkpoint功能是disabled的,想要使用的時候需要先啟用,checkpoint開啟之後,checkPointMode有兩種,Exactly-once和At-least-once,預設的checkPointMode是Exactly-once,Exactly-once對于大多數應用來說是最合适的。At-least-once可能用在某些延遲超低的應用程式(始終延遲為幾毫秒)。
預設checkpoint功能是disabled的,想要使用的時候需要先啟用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
env.enableCheckpointing(1000);
// 進階選項:
// 設定模式為exactly-once (這是預設值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間隻允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
恢複資料(容錯)
重新開機政策概述
Flink支援不同的重新開機政策,以在故障發生時控制作業如何重新開機,叢集在啟動時會伴随一個預設的重新開機政策,在沒有定義具體重新開機政策時會使用該預設政策。 如果在工作送出時指定了一個重新開機政策,該政策會覆寫叢集的預設政策,預設的重新開機政策可以通過 Flink 的配置檔案 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個政策被使用。
常用的重新開機政策
(1)固定間隔 (Fixed delay)
(2)失敗率 (Failure rate)
(3)無重新開機 (No restart)
如果沒有啟用 checkpointing,則使用無重新開機 (no restart) 政策。
如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策, 嘗試重新開機次數預設值是:Integer.MAX_VALUE,重新開機政策可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動态指定,會覆寫全局配置。
重新開機政策
固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 嘗試重新開機的次數
Time.of(10, TimeUnit.SECONDS) // 間隔
));
失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一個時間段内的最大失敗次數
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
Time.of(10, TimeUnit.SECONDS) // 間隔
));
無重新開機 (No restart)
第一種:全局配置 flink-conf.yaml
restart-strategy: none
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.noRestart());
多checkpoint
預設情況下,如果設定了Checkpoint選項,則Flink隻保留最近成功生成的1個Checkpoint,而當Flink程式失敗時,可以從最近的這個Checkpoint來進行恢複。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢複,這樣會更加靈活,比如,我們發現最近4個小時資料記錄處理有問題,希望将整個狀态還原到4小時之前Flink可以支援保留多個Checkpoint,需要在Flink的配置檔案conf/flink-conf.yaml中,添加如下配置,指定最多需要儲存Checkpoint的個數:
state.checkpoints.num-retained: 20
這樣設定以後就檢視對應的Checkpoint在HDFS上存儲的檔案目錄
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
如果希望回退到某個Checkpoint點,隻需要指定對應的某個Checkpoint路徑即可實作
從checkpoint恢複資料
如果Flink程式異常失敗,或者最近一段時間内資料處理錯誤,我們可以将程式從某一個Checkpoint點進行恢複
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
程式正常運作後,還會按照Checkpoint配置進行運作,繼續生成Checkpoint資料。
當然恢複資料的方式還可以在自己的代碼裡面指定checkpoint目錄,這樣下一次啟動的時候即使代碼發生了改變就自動恢複資料了。
savepoint(重量級checkpoint)
Flink通過Savepoint功能可以做到程式更新後,繼續從更新前的那個點開始執行計算,保證資料不中斷
全局,一緻性快照。可以儲存資料源offset,operator操作狀态等資訊,可以從應用在過去任意做了savepoint的時刻開始繼續消費
checkPoint vs savePoint
checkPoint
應用定時觸發,用于儲存狀态,會過期,内部應用失敗重新開機的時候使用。
savePoint
使用者手動執行,是指向Checkpoint的指針,不會過期,在更新的情況下使用。
注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利更新,推薦程式員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 将用于确定每一個算子的狀态範圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。隻要這些 ID 沒有改變就能從儲存點(savepoint)将程式恢複回來。而這些自動生成的 ID 依賴于程式的結構,并且對代碼的更改是很敏感的。是以,建議使用者手動的設定 ID。
savepoint的使用
1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設定,但是設定後,後面建立指定Job的Savepoint時,可以不用在手動執行指令時指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
flink cancel -s hdfs://xxx/savepoint jobId f849c15ee894d82f0e74297be658248bStart -yid applicationID
3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]