天天看點

【Flink流式計算架構】checkpoint(容錯)_恢複資料(容錯)_savepointcheckpoint(容錯)恢複資料(容錯)

012-Flink

  • checkpoint(容錯)
    • checkpoint概述
    • checkpoint配置
  • 恢複資料(容錯)
    • 重新開機政策概述
    • 重新開機政策
    • 多checkpoint
    • 從checkpoint恢複資料
    • savepoint(重量級checkpoint)

checkpoint(容錯)

checkpoint概述

(1)為了保證state的容錯性,Flink需要對state進行checkpoint。

(2)Checkpoint是Flink實作容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀态來生成快照,進而将這些狀态資料定期持久化存儲下來,當Flink程式一旦意外崩潰時,重新運作程式時可以有選擇地從這些快照進行恢複,進而修正因為故障帶來的程式資料異常

(3)Flink的checkpoint機制可以與(stream和state)的持久化存儲互動的前提:

持久化的source,它需要支援在一定時間内重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或檔案系統(比如HDFS,S3,GFS等)

用于state的持久化存儲,例如分布式檔案系統(比如HDFS,S3,GFS等)

生成快照5s 到 恢複快照

checkpoint配置

預設checkpoint功能是disabled的,想要使用的時候需要先啟用,checkpoint開啟之後,checkPointMode有兩種,Exactly-once和At-least-once,預設的checkPointMode是Exactly-once,Exactly-once對于大多數應用來說是最合适的。At-least-once可能用在某些延遲超低的應用程式(始終延遲為幾毫秒)。

預設checkpoint功能是disabled的,想要使用的時候需要先啟用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
env.enableCheckpointing(1000);
// 進階選項:
// 設定模式為exactly-once (這是預設值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間隻允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
           

恢複資料(容錯)

重新開機政策概述

Flink支援不同的重新開機政策,以在故障發生時控制作業如何重新開機,叢集在啟動時會伴随一個預設的重新開機政策,在沒有定義具體重新開機政策時會使用該預設政策。 如果在工作送出時指定了一個重新開機政策,該政策會覆寫叢集的預設政策,預設的重新開機政策可以通過 Flink 的配置檔案 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個政策被使用。

常用的重新開機政策

(1)固定間隔 (Fixed delay)

(2)失敗率 (Failure rate)

(3)無重新開機 (No restart)

如果沒有啟用 checkpointing,則使用無重新開機 (no restart) 政策。

如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策, 嘗試重新開機次數預設值是:Integer.MAX_VALUE,重新開機政策可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動态指定,會覆寫全局配置。

重新開機政策

固定間隔 (Fixed delay)

第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 嘗試重新開機的次數
  Time.of(10, TimeUnit.SECONDS) // 間隔
));
           

失敗率 (Failure rate)

第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 一個時間段内的最大失敗次數
  Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
  Time.of(10, TimeUnit.SECONDS) // 間隔
));
           

無重新開機 (No restart)

第一種:全局配置 flink-conf.yaml
restart-strategy: none
第二種:應用代碼設定
env.setRestartStrategy(RestartStrategies.noRestart());
           

多checkpoint

預設情況下,如果設定了Checkpoint選項,則Flink隻保留最近成功生成的1個Checkpoint,而當Flink程式失敗時,可以從最近的這個Checkpoint來進行恢複。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢複,這樣會更加靈活,比如,我們發現最近4個小時資料記錄處理有問題,希望将整個狀态還原到4小時之前Flink可以支援保留多個Checkpoint,需要在Flink的配置檔案conf/flink-conf.yaml中,添加如下配置,指定最多需要儲存Checkpoint的個數:

state.checkpoints.num-retained: 20
           

這樣設定以後就檢視對應的Checkpoint在HDFS上存儲的檔案目錄

hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints

如果希望回退到某個Checkpoint點,隻需要指定對應的某個Checkpoint路徑即可實作

從checkpoint恢複資料

如果Flink程式異常失敗,或者最近一段時間内資料處理錯誤,我們可以将程式從某一個Checkpoint點進行恢複

bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
           

程式正常運作後,還會按照Checkpoint配置進行運作,繼續生成Checkpoint資料。

當然恢複資料的方式還可以在自己的代碼裡面指定checkpoint目錄,這樣下一次啟動的時候即使代碼發生了改變就自動恢複資料了。

savepoint(重量級checkpoint)

Flink通過Savepoint功能可以做到程式更新後,繼續從更新前的那個點開始執行計算,保證資料不中斷

全局,一緻性快照。可以儲存資料源offset,operator操作狀态等資訊,可以從應用在過去任意做了savepoint的時刻開始繼續消費

checkPoint vs savePoint

checkPoint

應用定時觸發,用于儲存狀态,會過期,内部應用失敗重新開機的時候使用。

savePoint

使用者手動執行,是指向Checkpoint的指針,不會過期,在更新的情況下使用。

注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利更新,推薦程式員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 将用于确定每一個算子的狀态範圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。隻要這些 ID 沒有改變就能從儲存點(savepoint)将程式恢複回來。而這些自動生成的 ID 依賴于程式的結構,并且對代碼的更改是很敏感的。是以,建議使用者手動的設定 ID。

savepoint的使用

1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設定,但是設定後,後面建立指定Job的Savepoint時,可以不用在手動執行指令時指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】


bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】

flink cancel -s hdfs://xxx/savepoint jobId f849c15ee894d82f0e74297be658248bStart -yid applicationID

3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]
           

繼續閱讀