天天看點

Flink checkpoin機制及源碼分析講解

作者:不蓋被子的牛油果
Flink checkpoin機制及源碼分析講解

Apache Flink 是一個流處理架構,它的 checkpoin機制可以保證應用程式的容錯性。在本文中,我們将對 Flink 的 checkpoint 機制進行深入講解,同時通過源碼分析,了解它的實作細節。

Checkpoint 機制

Checkpoint 是指将應用程式的狀态儲存到外部存儲媒體中,以便在應用程式失敗時恢複狀态。在 Flink 中,checkpoint 機制由 JobManager 和 TaskManager 共同協作完成。JobManager 是 Flink 中的控制節點,負責管理應用程式的任務執行、資料分發等工作,TaskManager 則是具體執行任務的節點。當應用程式進行 checkpoint 時,JobManager 會向所有 TaskManager 發送消息,通知它們進行 checkpoint。每個 TaskManager 會将它所管理的任務的狀态儲存到外部存儲媒體中。

Checkpoint 機制可以保證 Flink 應用程式的容錯性。當應用程式執行過程中發生故障時,它可以通過從外部存儲媒體中讀取 checkpoint 來恢複狀态。這種機制可以有效地減少應用程式的停機時間,提高應用程式的可用性。

在 Flink 中,checkpoint 機制是可配置的。應用程式可以設定 checkpoint 的頻率和持久化方式。例如,應用程式可以設定每隔 1 分鐘進行一次 checkpoint,并将狀态儲存到 HDFS 中。下面我們将詳細講解 Flink 的 checkpoint 機制的實作細節。

Checkpoint 的實作細節

Checkpoint 生命周期

在 Flink 中,一個 checkpoint 包括三個階段:

  1. 前置操作(Pre-checkpoint):在此階段,JobManager 向 TaskManager 發送消息,通知它們準備進行 checkpoint。TaskManager 會對其管理的任務進行一些準備工作,如将任務的狀态儲存到記憶體中,暫停任務的執行等。
  2. Checkpoint 操作:在此階段,TaskManager 會将其管理的任務的狀态儲存到外部存儲媒體中。JobManager 會等待所有 TaskManager 完成此操作。
  3. 後置操作(Post-checkpoint):在此階段,JobManager 會将所有 TaskManager 儲存的狀态合并起來,并将其儲存到外部存儲媒體中。完成後,任務會繼續執行。

Checkpoint 的實作方式

Flink 的 checkpoint 機制有兩種實作方式:異步快照(Asynchronous Checkpointing)和同步快照(Synchronous Checkpointing)。兩者的差別在于 TaskManager 執行 checkpoint 的時間點。

在異步快照中,TaskManager 可以在任意時間點執行 checkpoint。JobManager 在收到所有 TaskManager 的 checkpoint 後,會将狀态合并并儲存到外部存儲媒體中。異步快照可以減少應用程式的停機時間,但是可能會導緻資料不

一緻性問題。例如,如果一個任務在 TaskManager 執行 checkpoint 前完成了一些操作,但在 checkpoint 完成後失敗了,那麼這些操作将會丢失。

在同步快照中,TaskManager 需要在一個全局的時間點同時執行 checkpoint。JobManager 會等待所有 TaskManager 完成 checkpoint,然後将狀态合并并儲存到外部存儲媒體中。同步快照可以保證資料的一緻性,但是可能會導緻應用程式的停機時間增加。

在 Flink 中,預設使用異步快照實作 checkpoint。應用程式可以通過設定參數來切換到同步快照模式。例如,應用程式可以設定:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

來開啟同步快照模式。

Checkpoint 存儲媒體

Flink 支援将 checkpoint 存儲在多種存儲媒體中,如本地檔案系統、HDFS、S3 等。在 Flink 中,checkpoint 存儲媒體由 StateBackend 管理。StateBackend 定義了如何将應用程式的狀态儲存到外部存儲媒體中。

Flink 提供了三種 StateBackend:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。其中 MemoryStateBackend 會将狀态儲存在 TaskManager 的記憶體中,适合小型應用程式。FsStateBackend 會将狀态儲存在本地檔案系統或 HDFS 中,适合中型應用程式。RocksDBStateBackend 會将狀态儲存在 RocksDB 中,适合大型應用程式。

應用程式可以通過設定參數來選擇 StateBackend。例如,應用程式可以設定:

env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/flink/checkpoints"));

來将狀态儲存在 RocksDB 中,并将 checkpoint 存儲在 HDFS 中。

Checkpoint 配置參數

Flink 提供了許多參數來配置 checkpoint。下面列出了一些常用的參數:

  • checkpoint.interval:指定 checkpoint 的時間間隔。
  • checkpoint.timeout:指定 checkpoint 的逾時時間。
  • checkpoint.max-concurrent-checkpoints:指定同時進行 checkpoint 的最大數量。
  • checkpointing.mode:指定 checkpoint 的實作方式,可以是 EXACTLY_ONCE 或 AT_LEAST_ONCE。
  • state.backend:指定 StateBackend 的類型。
  • state.checkpoints.dir:指定 checkpoint 存儲的目錄。

應用程式可以通過設定這些參數來調整 checkpoint 的性能和容錯性。

Checkpoint 源碼分析

下面我們将通過分析 Flink 的源碼,了解 checkpoint 的實作細節。

JobManager 和 TaskManager 的協作

在 Flink 中,JobManager 和 TaskManager 通過 Actor 模型進行通信。JobManager 和 TaskManager 之間的通信由兩個 Actor 承擔:

  • CheckpointCoordinator:負責管理 checkpoint 的執行,包括發送 checkpoint 請求、處理 checkpoint 請求、合并狀态等工作。
  • Task:負責處理具體的任務,包括執行任務、儲存任務的

狀态、處理 checkpoint 等工作。

CheckpointCoordinator 和 Task 之間的通信是通過 CheckpointMessages 實作的。CheckpointMessages 是一個包含了所有與 checkpoint 相關的消息類型的枚舉類型。例如,CheckpointMessages.TriggerCheckpoint 消息表示觸發 checkpoint,CheckpointMessages.AcknowledgeCheckpoint 消息表示确認 checkpoint。

在 Flink 中,當應用程式啟動時,JobManager 會建立 CheckpointCoordinator,并向 TaskManager 發送注冊消息。TaskManager 收到注冊消息後,會建立一個 CheckpointResponder,并将自己的位址發送給 CheckpointCoordinator。

CheckpointResponder 負責響應 checkpoint 請求。當 CheckpointCoordinator 向 TaskManager 發送 checkpoint 請求時,CheckpointResponder 會在 TaskManager 上執行 checkpoint,并将結果傳回給 CheckpointCoordinator。

Checkpoint 觸發

Flink 支援兩種方式觸發 checkpoint:基于時間的觸發和基于資料量的觸發。基于時間的觸發會在固定的時間間隔内觸發 checkpoint,而基于資料量的觸發會在處理一定數量的資料後觸發 checkpoint。

當應用程式使用基于時間的觸發時,JobManager 會定期向 TaskManager 發送 TriggerCheckpoint 消息。TaskManager 收到 TriggerCheckpoint 消息後,會向 CheckpointCoordinator 發送 TriggerCheckpoint 消息。CheckpointCoordinator 收到 TriggerCheckpoint 消息後,會向所有 TaskManager 發送 TriggerCheckpoint message,并等待所有 TaskManager 的響應。

當應用程式使用基于資料量的觸發時,TaskManager 會維護一個計數器,記錄處理的資料量。當計數器達到指定值時,TaskManager 會向 CheckpointCoordinator 發送 TriggerCheckpoint 消息。CheckpointCoordinator 收到 TriggerCheckpoint 消息後,會向所有 TaskManager 發送 TriggerCheckpoint 消息,并等待所有 TaskManager 的響應。

Checkpoint 處理

當 TaskManager 收到 TriggerCheckpoint 消息後,會執行 checkpoint,并将 checkpoint 結果發送給 CheckpointCoordinator。CheckpointCoordinator 收到 checkpoint 結果後,會将結果存儲在 StateBackend 中。

在異步快照模式下,TaskManager 可能會在完成 checkpoint 後立即開始執行下一個任務。是以,CheckpointCoordinator 需要等待所有 TaskManager 的響應,以確定所有 TaskManager 都已經完成 checkpoint。

當所有 TaskManager 的響應都到達後,CheckpointCoordinator 會将狀态合并,并将結果儲存在外部存儲媒體中。在同步快照模式下,CheckpointCoordinator 會等待所有 TaskManager 完成 checkpoint 後再進行合并和儲存操作。

Checkpoint 恢複

在 Flink 中,當應用程式發生故障時,可以通過 checkpoint 來恢複狀态。Flink 支援兩種恢複模式:精确一次(exactly-once)和至少一次(at-least-once)。

在精确一次模式下,Flink 可以保證在任何情況下都隻會恢複一次狀态。在這種模式下,Flink 使用兩階段送出來實作精确一次。Flink 在第一階段将所有 TaskManager 的狀态從記憶體中寫入到外部存儲媒體中。如果寫入成功,則在第二階段将狀态從外部存儲媒體中恢複到記憶體中。如果寫入失敗,則不會進行恢複操作。

在至少一次模式下,Flink 可以在應用程式發生故障時恢複狀态,但是可能會出現重複計算。在這種模式下,Flink 不會使用兩階段送出來保證恢複一次,而是在每個 checkpoint 中将狀态寫入外部存儲媒體中。在恢複時,Flink 會選擇最新的可用 checkpoint 來進行恢複。

Checkpoint 算法

Flink 中的 checkpoint 算法是基于 Chandy-Lamport 算法的。Chandy-Lamport 算法是一種基于消息傳遞的分布式快照算法,用于捕獲分布式系統的全局狀态。在 Chandy-Lamport 算法中,每個節點都會向其它節點發送 Marker 消息,并記錄收到的 Marker 消息。當節點接收到 Marker 消息後,它會暫停消息傳遞,并生成一個快照,然後繼續消息傳遞。

Flink 中的 checkpoint 算法是在 Chandy-Lamport 算法的基礎上進行擴充的。在 Flink 中,CheckpointCoordinator 會向所有 TaskManager 發送 TriggerCheckpoint 消息,并等待所有 TaskManager 的響應。當 TaskManager 收到 TriggerCheckpoint 消息後,它會停止處理任務,并将狀态寫入外部存儲媒體中。然後,TaskManager 會向 CheckpointCoordinator 發送 AcknowledgeCheckpoint 消息,表示已經完成 checkpoint。

當所有 TaskManager 的 AcknowledgeCheckpoint 消息都到達後,CheckpointCoordinator 會将狀态合并,并将結果儲存到外部存儲媒體中。在恢複時,Flink 會選擇最新的可用 checkpoint 進行恢複,并從外部存儲媒體中讀取 checkpoint 狀态并将其合并到記憶體中。

Flink 中的 checkpoint 算法是一種輕量級的算法,因為它隻在任務開始時向所有 TaskManager 發送 TriggerCheckpoint 消息,并且隻在任務結束時等待所有 TaskManager 的響應。是以,Flink 可以在運作時持續生成 checkpoint,而不會對任務的性能産生重大影響。

Checkpoint 并發控制

Flink 中的 checkpoint 算法是一種并發算法,是以需要對并發通路進行控制。在 Flink 中,CheckpointCoordinator 使用一個 ConcurrentLinkedQueue 來儲存 TriggerCheckpoint 消息,該隊列是線程安全的。當 TriggerCheckpoint 消息進入隊列時,CheckpointCoordinator 會在隊列中插入一個 BarrierMarker,用于标記 TriggerCheckpoint 消息之前的所有消息都已經被處理。當所有 TaskManager 的狀态都被寫入到外部存儲媒體中并且 AcknowledgeCheckpoint 消息被接收到時,CheckpointCoordinator 會删除 BarrierMarker 并繼續處理 TriggerCheckpoint 消息。

在 Flink 中,每個任務都有一個單獨的 checkpoint 算法執行個體,用于控制任務的 checkpoint。在任務開始時,checkpoint 算法會向其它任務發送 BarrierMarker 消息,并等待 BarrierMarker 消息的響應。當收到 BarrierMarker 消息時,任務會停止處理新消息,并将狀态寫入外部存儲媒體中。然後,任務會向發送 BarrierMarker 消息的任務發送 AcknowledgeBarrier 消息,表示已經完成 checkpoint。

當所有任務的 AcknowledgeBarrier 消息都到達後,checkpoint 算法會将狀态合并,并将結果儲存到外部存儲媒體中。在恢複時,Flink 會選擇最新的可用 checkpoint 進行恢複,并從外部存儲媒體中讀取 checkpoint 狀态并将其合并到記憶體中。

Checkpoint 存儲

Flink 支援多種外部存儲媒體,包括檔案系統、HDFS、S3 和 RocksDB 等。在存儲 checkpoint 時,Flink 會将 checkpoint 的 metadata 和狀态分别儲存到不同的檔案中。

checkpoint metadata 包括 checkpoint 的 ID、觸發時間、狀态大小、狀态檔案清單等資訊。狀态檔案清單是一個包含了狀态的檔案路徑和大小的清單。

checkpoint 狀态包括所有已經注冊的狀态,并且可以通過 KeyGroupPartitioner 将狀态分成多個片段,以便在處理大型狀态時進行分布式處理。每個狀态片段都會被儲存到單獨的檔案中。

在使用外部存儲媒體時,Flink 會使用 StateBackend 接口來通路外部存儲媒體。StateBackend 接口包括

如下方法:

  • CheckpointStreamFactory#createCheckpointStateOutputStream(long, long)

: 用于建立用于寫入 checkpoint 狀态的輸出流。

  • CheckpointStreamFactory#createCheckpointStateInputStream(long, long)

: 用于建立用于讀取 checkpoint 狀态的輸入流。

  • CheckpointStreamFactory#createTaskOwnedStateOutputStream(long, long)

: 用于建立用于寫入 task-owned 狀态的輸出流。

  • CheckpointStreamFactory#createSharedStateOutputStream(String, long, long)

: 用于建立用于寫入 shared 狀态的輸出流。

  • CheckpointStreamFactory#supportsHighlyAvailableStreams()

: 用于訓示該 StateBackend 是否支援高可用的流式存儲。

對于支援高可用的流式存儲媒體(如 HDFS),Flink 會在寫入 checkpoint 狀态時使用可重入的檔案鎖來避免多個任務同時寫入同一個檔案。對于不支援高可用的流式存儲媒體(如本地檔案系統),Flink 會在寫入 checkpoint 狀态時使用不同的檔案名,并在恢複時按照先後順序逐個讀取檔案。

Checkpoint 回退

在某些情況下,Flink 可能需要回退到先前的 checkpoint。例如,當出現故障時,Flink 可能會回退到最近的 checkpoint。在 Flink 中,CheckpointCoordinator 可以回退到先前的 checkpoint,并重新啟動任務。Flink 支援兩種類型的回退:任務級别的回退和全局級别的回退。

任務級别的回退是指隻回退單個任務的狀态,而全局級别的回退是指回退所有任務的狀态。在任務級别的回退中,CheckpointCoordinator 将發送一條 CancelCheckpoint 消息來中止目前的 checkpoint,并将狀态恢複到上一個 checkpoint 的狀态。在全局級别的回退中,CheckpointCoordinator 将回退到指定的 checkpoint,并将狀态恢複到該 checkpoint 的狀态。

源碼分析

Flink 的 checkpoint 算法的實作主要在 CheckpointCoordinator 和 CheckpointBarrierHandler 類中。

CheckpointCoordinator 是 Flink 中的核心元件之一,用于協調 checkpoint。它負責觸發 checkpoint、接收任務的 checkpoint 狀态、處理 checkpoint 的确認資訊等。CheckpointCoordinator 還負責管理多個 checkpoint,并在任務失敗時協調回退到先前的 checkpoint。CheckpointCoordinator 實作了 CheckpointCoordinatorInterface 接口,并通過調用 TriggerCheckpoint 方法觸發 checkpoint。

CheckpointBarrierHandler 是一個用于處理 BarrierMarker 和 AcknowledgeBarrier 消息的線程。它負責在任務中發送 BarrierMarker 消息,處理收到的 BarrierMarker 消息,并向發送 BarrierMarker 消息的任務發送 AcknowledgeBarrier 消息。

在 Flink 中,TaskManager 用于執行任務。每個任務都有一個 CheckpointCoordinator 和一個 CheckpointBarrierHandler。當 TaskManager 收到 TriggerCheckpoint 消息時,CheckpointCoordinator 将觸發 checkpoint,并将狀态寫入外部存儲媒體中。當任務收到 BarrierMarker 消息時,CheckpointBarrierHandler 将停止任務的消息處理,并将任務的狀态寫入外部存儲媒體中。在所有任務都收到了 BarrierMarker 消息并将狀态寫入外部存儲媒體中之後,CheckpointCoordinator 将開始确認 checkpoint,并在所有任務都确認了 checkpoint 之後将該 checkpoint 标記為已完成。

下面我們來分析 Flink 的 checkpoint 算法的源代碼實作。

CheckpointCoordinator

CheckpointCoordinator 是 Flink 的核心元件之一,用于協調 checkpoint。它負責觸發 checkpoint、接收任務的 checkpoint 狀态、處理 checkpoint 的确認資訊等。CheckpointCoordinator 還負責管理多個 checkpoint,并在任務失敗時協調回退到先前的 checkpoint。

CheckpointCoordinator 還負責管理多個 checkpoint,并在任務失敗時協調回退到先前的 checkpoint。

首先,我們來看一下 CheckpointCoordinator 的主要字段和構造函數:

public class CheckpointCoordinator implements CheckpointCoordinatorInterface { private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); private final Object lock = new Object(); private final JobID jobId; private final CheckpointIDCounter checkpointIDCounter; private final long checkpointTimeout; private final long minPauseBetweenCheckpoints; private final int maxConcurrentCheckpoints; private final int checkpointsCleanUpInterval; private final CheckpointFailureManager failureManager; private final ScheduledExecutorService timer; private final ScheduledFuture<?> periodicScheduler; private final Map<Long, PendingCheckpoint> pendingCheckpoints; private final Map<Long, CompletedCheckpoint> completedCheckpoints; private final Set<ExecutionAttemptID> tasksToCommitTo; private final Map<ExecutionAttemptID, Execution> executions; private final Map<JobVertexID, Integer> numSubtasksPerTask; private final CheckpointStorage coordinatorCheckpointStorage; private final CheckpointIDCounter completedCheckpointCounter; private final CompletedCheckpointStore completedCheckpointStore; private final CheckpointStatsTracker statsTracker; private volatile boolean shutdown; public CheckpointCoordinator( JobID jobId, CheckpointIDCounter checkpointIDCounter, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, int checkpointsCleanUpInterval, CheckpointFailureManager failureManager, ScheduledExecutorService timer, Map<ExecutionAttemptID, Execution> executions, Map<JobVertexID, Integer> numSubtasksPerTask, CheckpointStorage coordinatorCheckpointStorage, CompletedCheckpointStore completedCheckpointStore, CheckpointStatsTracker statsTracker) { this.jobId = Preconditions.checkNotNull(jobId); this.checkpointIDCounter = Preconditions.checkNotNull(checkpointIDCounter); this.checkpointTimeout = checkpointTimeout; this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointsCleanUpInterval = checkpointsCleanUpInterval; this.failureManager = Preconditions.checkNotNull(failureManager); this.timer = Preconditions.checkNotNull(timer); this.pendingCheckpoints = new HashMap<>(); this.completedCheckpoints = new HashMap<>(); this.tasksToCommitTo = new HashSet<>(); this.executions = Preconditions.checkNotNull(executions); this.numSubtasksPerTask = Preconditions.checkNotNull(numSubtasksPerTask); this.coordinatorCheckpointStorage = Preconditions.checkNotNull(coordinatorCheckpointStorage); this.completedCheckpointCounter = new CheckpointIDCounter(); this.completedCheckpointStore = Preconditions.checkNotNull(completedCheckpointStore); this.statsTracker = Preconditions.checkNotNull(statsTracker); this.shutdown = false; // 啟動周期性排程器 this.periodicScheduler = timer.scheduleAtFixedRate( new Runnable() { @Override public void run() { try { triggerPeriodicCheckpoint(true); } catch (Exception e) { LOG.error("Exception while triggering periodic checkpoint", e); } } }, minPauseBetweenCheckpoints, minPauseBetweenCheckpoints, TimeUnit.MILLISECONDS); } ... }

我們可以看到,CheckpointCoordinator 中有許多字段,其中包括:

  • jobId:該作業的 JobID。
  • checkpointIDCounter:用于生成 checkpoint ID 的計數器。
  • checkpointTimeout:checkpoint 的逾時時間。
  • minPauseBetweenCheckpoints:兩次 checkpoint 之間的最小間隔時間。
  • maxConcurrentCheckpoints:最大并發 checkpoint 數量。
  • checkpointsCleanUpInterval:清理過期 checkpoint 的時間間隔。
  • failureManager:用于處理 checkpoint 失敗的 CheckpointFailureManager。
  • timer:用于周期性執行 checkpoint 的 ScheduledExecutorService。
  • periodicScheduler:周期性執行 checkpoint 的 ScheduledFuture。
  • pendingCheckpoints:尚未完成的 checkpoint 集合。
  • completedCheckpoints:已完成的 checkpoint 集合。
  • tasksToCommitTo:需要将 checkpoint commit 給的任務集合。
  • executions:目前正在運作的任務 Execution 的集合。
  • numSubtasksPerTask:每個任務(JobVertex)的子任務數量。
  • coordinatorCheckpointStorage:協調員 checkpoint 存儲。
  • completedCheckpointCounter:已完成 checkpoint 的計數器。
  • completedCheckpointStore:已完成 checkpoint 的存儲。
  • statsTracker:Checkpoint 統計資訊的跟蹤器。
  • shutdown:是否已關閉。

在構造函數中,我們可以看到 CheckpointCoordinator 啟動了一個周期性排程器 periodicScheduler,用于周期性觸發 checkpoint。這裡傳入了一個 Runnable 對象,其中的 triggerPeriodicCheckpoint(true) 方法用于觸發周期性 checkpoint。這裡的 true 參數表示該 checkpoint 是周期性的。

CheckpointCoordinator 主要有三個方法:

  • triggerCheckpoint:觸發一個 checkpoint。
  • triggerPeriodicCheckpoint:觸發周期性 checkpoint。
  • receiveAcknowledgeMessage:接收 checkpoint 完成的确認消息。

其中,receiveAcknowledgeMessage 方法是接收 checkpoint 完成确認消息的方法,是以,我們先來看一下這個方法:

@Override public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String senderAddress) throws Exception { if (shutdown) { LOG.debug("Coordinator is shutdown. Discarding message."); return; } ExecutionAttemptID executionAttemptId = message.getTaskExecutionId(); // 将接收到的 AcknowledgeCheckpoint 消息的 Execution 标記為已确認 checkpoint final Execution execution = executions.get(executionAttemptId); if (execution != null) { execution.acknowledgeCheckpoint(message.getCheckpointId(), message.getSubtaskState(), senderAddress); } else { LOG.warn("Received AcknowledgeCheckpoint message for non-existing execution " + executionAttemptId); } PendingCheckpoint p = pendingCheckpoints.get(message.getCheckpointId()); if (p == null) { // checkpoint 已過期,直接傳回 if (isCheckpointExpired(message.getCheckpointId())) { LOG.debug("Received late message for now expired checkpoint attempt " + message.getCheckpointId() + " : " + message); return; } else { // checkpoint 尚未過期,可能是因為 Master 節點和 Worker 節點網絡不穩定導緻消息延遲,将消息存儲起來等待 checkpoint 完成 // 如果還沒有 PendingCheckpoint 對象,則建立一個新的 PendingCheckpoint 對象并将 AcknowledgeCheckpoint 消息存儲到其中。 // 由于 AcknowledgeCheckpoint 消息可能會先于 TriggerCheckpoint 消息到達,是以 PendingCheckpoint 對象可能還不存在。 LOG.debug("Received AcknowledgeCheckpoint message for an unknown checkpoint attempt " + message.getCheckpointId() + " : " + message); p = new PendingCheckpoint( jobId, message, numSubtasksPerTask, checkpointTimeout, checkpointProperties, alignmentTimeout);

// 将消息存儲到 PendingCheckpoint 對象中 p.addSubtaskAcknowledgement(message.getTaskExecutionId(), message.getSubtaskState(), senderAddress); pendingCheckpoints.put(p.getCheckpointId(), p); } } else { // checkpoint 尚未完成,将 AcknowledgeCheckpoint 消息存儲到對應的 PendingCheckpoint 對象中 // 添加一個子任務的确認消息 p.addSubtaskAcknowledgement(message.getTaskExecutionId(), message.getSubtaskState(), senderAddress); // 如果該 PendingCheckpoint 對象已收到所有子任務的确認消息,則将該 checkpoint 切換為完成狀态 if (p.isFullyAcknowledged()) { pendingCheckpoints.remove(p.getCheckpointId()); try { finalizeCheckpoint(p); } catch (Throwable t) { // 處理 checkpoint 失敗 handleCheckpointException(p, t); } } }

}

receiveAcknowledgeMessage 方法主要實作了接收 checkpoint 完成确認消息的邏輯。當接收到 AcknowledgeCheckpoint 消息時,首先根據 ExecutionAttemptID 找到對應的 Execution,然後将該 Execution 的 checkpoint 狀态設定為已完成。 接着,如果該 checkpoint 已經完成或過期,就直接傳回。如果該 checkpoint 尚未完成,就将 AcknowledgeCheckpoint 消息存儲到對應的 PendingCheckpoint 對象中,并判斷該 checkpoint 是否已經完成。如果該 PendingCheckpoint 對象已經收到所有子任務的确認消息,則将該 checkpoint 切換為完成狀态,并調用 finalizeCheckpoint 方法進行後續處理。 finalizeCheckpoint 方法的具體實作如下: ```java private void finalizeCheckpoint(PendingCheckpoint checkpoint) throws Exception { // Checkpoints have to be locked while they are being confirmed. Otherwise, we risk that // messages get sent between the the confirmation of the checkpoint and the execution of // the subsequent commands, and the messages are not covered by the checkpoint. synchronized (checkpoint) { // 檢查 checkpoint 是否過期 if (isCheckpointExpired(checkpoint.getCheckpointId())) { discardCheckpoint(checkpoint); return; } // 将 PendingCheckpoint 轉換為 CompletedCheckpoint CompletedCheckpoint completed = checkpoint.finalizeCheckpoint(); // 将 CompletedCheckpoint 存儲到 CompletedCheckpointStore 中 try { completedCheckpointStore.addCheckpoint(completed); } catch (Exception e) { // 存儲失敗,觸發檢查點復原 handleCheckpointException(checkpoint, new Exception("Could not complete the checkpoint: " + e.getMessage(), e)); return; } // 将 CompletedCheckpoint 發送給需要将其 commit 的任務 for (TaskStateSnapshot subtaskState : completed.getTaskStates()) { tasksToCommitTo.add(new TaskStateSnapshotEntry(subtaskState)); } // 更新 checkpoint 計數器 completedCheckpointCounter.incrementAndGet(); // 清理過期的 checkpoint cleanUpCheckpoints(true); // 發送 CompletedCheckpoint 給外部 sink notifyCheckpointComplete(completed); } }

finalizeCheckpoint 方法主要實作了 checkpoint 完成時的後續處理邏輯。該方法首先檢查 checkpoint 是否過期,如果已經過期,則将該 checkpoint 删除。

如果 checkpoint 沒有過期,則将 PendingCheckpoint 對象轉換為 CompletedCheckpoint 對象,并将其存儲到 CompletedCheckpointStore 中。

CompletedCheckpointStore 是 Flink 内部用來存儲已完成的 checkpoint 的存儲系統,它預設使用 Flink 的記憶體狀态後端進行存儲。Flink 提供了多種狀态後端,可以将已完成的 checkpoint 存儲在記憶體、檔案系統、HDFS、RocksDB 等不同的存儲系統中。

在存儲 CompletedCheckpoint 之後,finalizeCheckpoint 方法會将 CompletedCheckpoint 發送給需要将其 commit 的任務,并更新 checkpoint 計數器和清理過期的 checkpoint。最後,該方法會發送 CompletedCheckpoint 給外部 sink,通知 sink 已經完成該 checkpoint。

Checkpoint 的恢複和恢複流程

當 Flink 任務發生故障時,需要将其恢複到故障發生時的狀态,以便于繼續運作。Checkpoint 提供了一種實作任務恢複的機制。Flink 的恢複流程一般如下:

  1. 首先,Flink 會根據 CompletedCheckpointStore 中存儲的 checkpoint 資訊來恢複任務的狀态。
  2. 然後,Flink 會将從 checkpoint 中恢複的狀态重新配置設定給任務,以便于任務繼續執行。
  3. 最後,Flink 會使用資料源中的資料來恢複任務的計算結果。

在 Flink 中,任務的恢複流程是由 JobManager 控制的。具體來說,當任務發生故障時,JobManager 會嘗試從最近的一個 CompletedCheckpoint 中恢複任務狀态。如果恢複成功,則任務可以從故障發生時的狀态繼續執行;否則,任務會被停止。

下面我們來看一下 Flink 的恢複流程具體是如何實作的。當任務啟動時,JobManager 會建立一個 ExecutionGraph 對象,用來描述整個任務的執行圖。ExecutionGraph 中包含了所有的 Execution 和它們之間的依賴關系。同時,JobManager 也會建立一個 ExecutionEnvironment 對象,用來管理整個任務的執行過程。

當任務發生故障時,JobManager 會根據 ExecutionGraph 和 ExecutionEnvironment 來嘗試恢複任務的狀态。具體來說,JobManager 會根據 ExecutionGraph 中存儲的任務執行資訊和 CompletedCheckpointStore 中存儲的 checkpoint 資訊,來建立需要恢複的 Execution 和它們的任務狀态。然後,JobManager 會将這些任務狀态重新配置設定給對應的 Execution,并使用資料源中的資料來恢複任務的計算結果。如果恢複成功,則任務可以從故障發生時的狀态繼續執行;否則,任務會被停止。

Flink 支援兩種恢複模式:精确一次和至少一次。精确一次恢複模式要求所有的資料都要被恰好一次地處理,是以在恢複時需要保證所有的資料都隻被處理一次,進而保證結果的準确性。至少一次恢複模式則不需要保證所有的資料隻被處理一次,但可以在資料重複處理時快速恢複任務的狀态。

在精确一次恢複模式下,Flink 會将已完成的 checkpoint 的所有狀态都恢複到任務中。Flink 還提供了恢複狀态時的兩種模式:重放模式和合并模式。在重放模式下,Flink 會将所有的狀态都恢複到任務中,然後重新計算所有的資料;在合并模式下,Flink 會将已完成的 checkpoint 中的狀态合并到任務中,然後繼續從故障發生點繼續執行。重放模式需要更多的時間和資源來恢複狀态,但它能夠保證資料隻被處理一次,進而保證結果的準确性。而合并模式則可以快速地恢複任務的狀态,但可能會導緻資料重複處理,進而影響結果的準确性。

在至少一次恢複模式下,Flink 會嘗試恢複任務的狀态,但不會保證資料隻被處理一次。在這種模式下,Flink 會将已完成的 checkpoint 的狀态合并到任務中,然後繼續從故障發生點繼續執行。當 Flink 接收到一條新的資料時,它會先檢查該資料是否已經被處理過。如果該資料已經被處理過,則 Flink 會将該資料丢棄,否則 Flink 會将該資料處理并将其輸出。由于資料可能會重複處理,是以在至少一次恢複模式下,Flink 不會保證結果的準确性,但可以快速地恢複任務的狀态。

Checkpoint 的并發和容錯性

Flink 的 checkpoint 機制具有很好的并發和容錯性。當 Flink 處理大量資料時,它可以将資料分成多個分區,并将每個分區配置設定給不同的任務來處理。由于每個任務都可以在不同的線程中執行,是以可以利用多核 CPU 來提高處理速度。在這種情況下,每個任務都會在它自己的時間軸上執行,是以它們不需要互相協調。

同時,Flink 的 checkpoint 機制還具有很好的容錯性。當任務發生故障時,Flink 可以通過恢複 checkpoint 來恢複任務的狀态。如果一個任務執行失敗,那麼它的所有狀态都會被恢複到 checkpoint 的狀态,進而保證任務的執行結果的正确性。

總結

Checkpoint 是 Flink 中非常重要的一個機制,它可以保證 Flink 的任務在發生故障時能夠快速恢複到故障發生前的狀态。Checkpoint 不僅能夠提高 Flink 的容錯性,還能夠提高任務的吞吐量和性能。在 checkpoint 機制中,Flink 通過周期性地建立 checkpoint 來儲存任務的狀态。Flink 支援兩種恢複模式:精确一次恢複模式和至少一次恢複模式。精确一次恢複模式需要保證所有的資料隻被處理一次,進而保證結果的準确性。至少一次恢複模式不需要保證所有的資料隻被處理一次,但可以在資料重複處理時快速恢複任務的狀态。在 checkpoint 機制中,Flink 還具有很好的并發性和容錯性,它可以将資料分成多個分區,并将每個分區配置設定給不同的任務來處理,同時它還能夠通過恢複 checkpoint 來恢複任務的狀态。

下面是 checkpoint 的源碼分析:

Flink 中 checkpoint 的實作主要是由 CheckpointCoordinator 和 CheckpointBarrierHandler 兩個類實作的。

CheckpointCoordinator

CheckpointCoordinator 是 Flink 中 checkpoint 的核心類。它負責周期性地建立 checkpoint 并管理 checkpoint 的狀态。CheckpointCoordinator 主要有以下幾個重要的方法:

  • triggerCheckpoint():觸發一個新的 checkpoint。
  • receiveAcknowledgeMessage():處理任務對 checkpoint 的确認消息。
  • receiveDeclineMessage():處理任務對 checkpoint 的拒絕消息。
  • receiveBarrier():處理任務發送的 checkpoint barrier。
  • abortPendingCheckpoints():取消所有未完成的 checkpoint。

在 CheckpointCoordinator 中,Flink 使用了一個雙重循環的算法來實作 checkpoint。該算法中,Flink 首先建立一個新的 checkpoint,然後向所有任務發送一個 checkpoint barrier。當任務收到 checkpoint barrier 後,它會停止處理新的資料,并将已處理的資料緩存起來,然後向 CheckpointCoordinator 發送确認消息。當 CheckpointCoordinator 收到所有任務的确認消息後,它會将 checkpoint 标記為完成,并将 checkpoint 的狀态儲存到持久化存儲中。如果任務發生故障,Flink 可以通過恢複 checkpoint 來恢複任務的狀态。

CheckpointBarrierHandler

CheckpointBarrierHandler 是 Flink 中處理 checkpoint barrier 的核心類。它負責接收和處理任務發送的 checkpoint barrier,然後将 checkpoint barrier 發送給下遊任務。

CheckpointBarrierHandler 主要有以下幾個重要的方法:

  • processBarrier():處理任務發送的 checkpoint barrier。
  • processSubtaskState():處理任務發送的子任務狀态。

在 CheckpointBarrierHandler 中,Flink 使用了一種基于連結清單的資料結構來實作任務之間的通信。該資料結構中,每個任務都維護了一個連結清單,該連結清單包含了所有已接收但未處理的 checkpoint barrier。當任務處理完一個 checkpoint barrier 後,它會将該 checkpoint barrier 從連結清單中删除,并将下一個 checkpoint barrier 發送給下遊任務。如果任務發生故障,Flink 可以通過恢複 checkpoint來恢複任務的狀态,并使用連結清單來重放已接收但未處理的 checkpoint barrier。

Checkpoint 的配置

在 Flink 中,可以通過配置來調整 checkpoint 的行為和性能。以下是一些常用的配置參數:

  • state.checkpoints.dir:指定 checkpoint 的儲存目錄。
  • state.backend: 設定 state backend,支援的 state backend 有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend 等。
  • state.checkpoints.num-retained:設定保留的最大 checkpoint 數量。
  • state.checkpoints.interval:設定兩個 checkpoint 之間的間隔時間。
  • state.checkpoints.timeout:設定 checkpoint 的逾時時間。

當然,這隻是部分常用的配置參數,具體的配置參數可以參考 Flink 的官方文檔。

總結

在分布式計算中,容錯性和恢複能力是非常重要的,而 checkpoint 機制正是解決這個問題的關鍵。Flink 通過 checkpoint 機制實作了高效的容錯和恢複,不僅可以提高任務的吞吐量和性能,還能夠保證結果的準确性和一緻性。本文通過對 Flink checkpoint 的源碼分析,介紹了 checkpoint 的實作原理和核心類,以及常用的配置參數。

繼續閱讀