天天看点

Flink checkpoint源码理解

参考:https://blog.jrwang.me/2019/flink-source-code-checkpoint/#checkpoint-%E7%9A%84%E5%8F%91%E8%B5%B7%E6%B5%81%E7%A8%8B

https://cloud.tencent.com/developer/article/1593969

https://blog.csdn.net/zc19921215/article/details/108171455

Flink checkpoint主要包括 barrier的生成 barrier的传递,状态的保存等过程,源码也是结合这几个部分

课前知识点:

  • 每一个算子的快照snapshot被抽象为 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照结果
  • 在checkpoint过程中,CheckpointCoordinator是很重要的存在,它是整个流程的”协调者“,负责
    • 发起 checkpoint 触发的消息,并接收不同 task 对 checkpoint 的响应信息(Ack)
    • 维护 Ack 中附带的状态句柄(state-handle)的全局视图

Start .......

如果作业开启了checkpoint,DefaultExecutionGraphBuilder中会调用 executionGraph.enableCheckpointing()方法

class DefaultExecutionGraphBuilder

// configure the state checkpointing
if (isCheckpointingEnabled(jobGraph)) {
     ......N行代码省略.......
    executionGraph.enableCheckpointing(
        chkConfig,
        hooks,
        checkpointIdCounter,
        completedCheckpointStore,
        rootBackend,
        rootStorage,
        checkpointStatsTracker,
        checkpointsCleaner);
}
           

这里会创建 CheckpointCoordinator 对象

class  DefaultExecutionGraph

  // create the coordinator that triggers and commits checkpoints and holds the state
        checkpointCoordinator =
                new CheckpointCoordinator(
                        jobInformation.getJobId(),
                        chkConfig,
                        operatorCoordinators,
                        checkpointIDCounter,
                        checkpointStore,
                        checkpointStorage,
                        ioExecutor,
                        checkpointsCleaner,
                        new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                        SharedStateRegistry.DEFAULT_FACTORY,
                        failureManager,
                        createCheckpointPlanCalculator(
                                chkConfig.isEnableCheckpointsAfterTasksFinish()),
                        new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
           

并注册一个作业状态的监听 CheckpointCoordinatorDeActivator

class CheckpointCoordinator

// ------------------------------------------------------------------------
    //  job status listener that schedules / cancels periodic checkpoints
    // ------------------------------------------------------------------------

    public JobStatusListener createActivatorDeactivator() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            if (jobStatusListener == null) {
                jobStatusListener = new CheckpointCoordinatorDeActivator(this);
            }

            return jobStatusListener;
        }
    }
           

 CheckpointCoordinatorDeActivator 会在作业状态发生改变为RUNNING时得到通知,通过 CheckpointCoordinator.startCheckpointScheduler 启动 checkpoint 的定时器。

public class CheckpointCoordinatorDeActivator implements JobStatusListener {
   ......N行代码省略.......
    @Override
    public void jobStatusChanges(
            JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
        if (newJobStatus == JobStatus.RUNNING) {
            // start the checkpoint scheduler
            coordinator.startCheckpointScheduler();
        } else {
            // anything else should stop the trigger for now
            coordinator.stopCheckpointScheduler();
        }
    }
}
           

定时任务被封装为 ScheduledTrigger, 运行时会调用 CheckpointCoordinator.triggerCheckpoint() 触发一次 checkpoint

CheckpointCoordinator 发出触发 checkpoint 的消息,最终通过 RPC 调用 TaskExecutorGateway.triggerCheckpoint(TaskExecutorGateway为接口,具体实现在TaskExecutor中),即请求执行 TaskExecutor.triggerCheckpoin()。 因为一个 TaskExecutor 中可能有多个 Task 正在运行,因而要根据触发 checkpoint 的 ExecutionAttemptID 找到对应的 Task,然后调用 Task.triggerCheckpointBarrier() 方法。只有作为 source 的 Task 才会触发 triggerCheckpointBarrier() 方法的调用

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
   ......N行代码省略.......

 final Task task = taskSlotTable.getTask(executionAttemptID);

   if (task != null) {
    task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

    return CompletableFuture.completedFuture(Acknowledge.get());
} else{
    ......N行代码省略.......
} 
}
           

Task 执行 checkpoint 的真正逻辑被封装在 AbstractInvokable类中,AbstractInvokable 中有两个触发 checkpoint 的方法

  • triggerCheckpointAsync:这个方法被 checkpoint coordinator触发执行checkpoint,是触发 checkpoint 的源头,会向下游注入barriers,即向source task注入barriers
  • triggerCheckpointOnBarrier :下游的节点在接收到上游的barriers时会触发该方法
class AbstractInvokable 

  public Future<Boolean> triggerCheckpointAsync(
            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        throw new UnsupportedOperationException(
                String.format(
                        "triggerCheckpointAsync not supported by %s", this.getClass().getName()));
    }


  public void triggerCheckpointOnBarrier(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetricsBuilder checkpointMetrics)
            throws IOException {
        throw new UnsupportedOperationException(
                String.format(
                        "triggerCheckpointOnBarrier not supported by %s",
                        this.getClass().getName()));
    }
           

这两个方法的具体实现有一些细微的差异,但主要的逻辑是一致的,内部都会执行performCheckpoint()方法

class StreamTask
private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetricsBuilder checkpointMetrics)
        throws Exception {
             ......N行代码省略.......
             subtaskCheckpointCoordinator.checkpointState(
        checkpointMetaData,
        checkpointOptions,
        checkpointMetrics,
        operatorChain,
        finishedOperators,
        this::isRunning);             
        }
           

 在checkpointState中,会处理两个事情 1)先向下游发送 barrier, 2)存储检查点快照

class SubtaskCheckpointCoordinatorImpl

// Step (2): 向下游发送barrier

operatorChain.broadcastEvent(
        new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),

//如果是 unaligned  barrier的处理
        options.isUnalignedCheckpoint());
 
 // Step (4): 快照

Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
        new HashMap<>(operatorChain.getNumberOfOperators());
try {
    if (takeSnapshotSync(
            snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
        finishAndReportAsync(
                snapshotFutures,
                metadata,
                metrics,
                operatorChain.isFinishedOnRestore(),
                isOperatorsFinished,
                isRunning);
    } else {
        cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
    }
} catch (Exception ex) {
    cleanup(snapshotFutures, metadata, metrics, ex);
    throw ex;
}  
           

 这里涉及到对齐barrier checkpoint 和非对其barrier checkpoint(unaligned checkpoint)的知识,后面解释,这两种都可以保证exactly once

从Step4可以看出先执行state的snapshot,再report执行结果给JobManager,report的消息最终会通过rpc的方式发送CheckpointCoordinator

在一个 Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 响应,对 Ack 响应的处理流程主要为:

  • 根据 Ack 的 checkpointID 从 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找对应的 PendingCheckpoint,对于一个已经触发但还没有完成的 checkpoint,即 PendingCheckpoint
  • 若存在对应的 PendingCheckpoint
    •   这个 PendingCheckpoint 没有被丢弃,调用 PendingCheckpoint.acknowledgeTask 方法处理 Ack,根据处理结果的不同:
      • SUCCESS:判断是否已经接受了所有需要响应的 Ack(后面也会提到),如果是,则调用completePendingCheckpoint 完成此次 checkpoint
      • DUPLICATE:Ack 消息重复接收,直接忽略
      • UNKNOWN:未知的 Ack 消息,清理上报的 Ack 中携带的状态句柄
      • DISCARD:Checkpoint 已经被 discard,清理上报的 Ack 中携带的状态句柄

​​        ​​​​  这个 PendingCheckpoint 已经被丢弃,抛出异常

  • 若不存在对应的 PendingCheckpoint,则清理上报的 Ack 中携带的状态句柄

源码如下:

class CheckpointCoordinator

synchronized (lock) {
    // we need to check inside the lock for being shutdown as well, otherwise we
    // get races and invalid error log messages
    if (shutdown) {
        return false;
    }

    final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

    if (checkpoint != null && !checkpoint.isDisposed()) {

        switch (checkpoint.acknowledgeTask(
                message.getTaskExecutionId(),
                message.getSubtaskState(),
                message.getCheckpointMetrics(),
                getStatsCallback(checkpoint))) {
            case SUCCESS:
                LOG.debug(
                        "Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
                        checkpointId,
                        message.getTaskExecutionId(),
                        message.getJob(),
                        taskManagerLocationInfo);

                if (checkpoint.isFullyAcknowledged()) {
                    completePendingCheckpoint(checkpoint);
                }
                break;
            case DUPLICATE:
                LOG.debug(
                        "Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
                        message.getCheckpointId(),
                        message.getTaskExecutionId(),
                        message.getJob(),
                        taskManagerLocationInfo);
                break;
            case UNKNOWN:
                LOG.warn(
                        "Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
                                + "because the task's execution attempt id was unknown. Discarding "
                                + "the state handle to avoid lingering state.",
                        message.getCheckpointId(),
                        message.getTaskExecutionId(),
                        message.getJob(),
                        taskManagerLocationInfo);

                discardSubtaskState(
                        message.getJob(),
                        message.getTaskExecutionId(),
                        message.getCheckpointId(),
                        message.getSubtaskState());

                break;
            case DISCARDED:
                LOG.warn(
                        "Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
                                + "because the pending checkpoint had been discarded. Discarding the "
                                + "state handle tp avoid lingering state.",
                        message.getCheckpointId(),
                        message.getTaskExecutionId(),
                        message.getJob(),
                        taskManagerLocationInfo);

                discardSubtaskState(
                        message.getJob(),
                        message.getTaskExecutionId(),
                        message.getCheckpointId(),
                        message.getSubtaskState());
        }

        return true;
    } else if (checkpoint != null) {
        // this should not happen
        throw new IllegalStateException(
                "Received message for discarded but non-removed checkpoint "
                        + checkpointId);
    } else {
        reportStats(
                message.getCheckpointId(),
                message.getTaskExecutionId(),
                message.getCheckpointMetrics());
        boolean wasPendingCheckpoint;

        // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
        if (recentPendingCheckpoints.contains(checkpointId)) {
            wasPendingCheckpoint = true;
            LOG.warn(
                    "Received late message for now expired checkpoint attempt {} from task "
                            + "{} of job {} at {}.",
                    checkpointId,
                    message.getTaskExecutionId(),
                    message.getJob(),
                    taskManagerLocationInfo);
        } else {
            LOG.debug(
                    "Received message for an unknown checkpoint {} from task {} of job {} at {}.",
                    checkpointId,
                    message.getTaskExecutionId(),
                    message.getJob(),
                    taskManagerLocationInfo);
            wasPendingCheckpoint = false;
        }

        // try to discard the state so that we don't have lingering state lying around
        discardSubtaskState(
                message.getJob(),
                message.getTaskExecutionId(),
                message.getCheckpointId(),
                message.getSubtaskState());

        return wasPendingCheckpoint;
    }
}
           

PendingCheckpoint它是如何处理 Ack 消息的呢?在 PendingCheckpoint 内部维护了两个 Map,分别是:

  • Map<OperatorID, OperatorState> operatorStates; : 已经接收到 Ack 的算子的状态句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但还没有接收到的 Task

每当接收到一个 Ack 消息时,PendingCheckpoint 就从 notYetAcknowledgedTasks 中移除对应的 Task,并保存 Ack 携带的状态句柄保存。当 notYetAcknowledgedTasks 为空时,表明所有的 Ack 消息都接收到了。

一旦 PendingCheckpoint 确认所有 Ack 消息都已经接收,那么就可以完成此次 checkpoint 了,在CASE SUCCESS中来判断是否Ack消息都已经被接受

if (checkpoint.isFullyAcknowledged()) {
                    completePendingCheckpoint(checkpoint);
                }
           

当所有算子的Ack消息都接收后,会有一下处理:

  • 调用 PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint
  • 将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
  • 移除被越过的 PendingCheckpoint,因为 CheckpointID 是递增的,那么所有比当前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丢弃了
  • 依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
class CheckpointCoordinator
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
        throws CheckpointException {

 //调用 PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint
            completedCheckpoint =
                    pendingCheckpoint.finalizeCheckpoint(
                            checkpointsCleaner,
                            this::scheduleTriggerRequest,
                            executor,
                            getStatsCallback(pendingCheckpoint));
                            
//将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
completedCheckpointStore.addCheckpoint(
        completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);

    // 移除被越过的 PendingCheckpoint
    dropSubsumedCheckpoints(checkpointId);


    //在该方法中,依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
    sendAcknowledgeMessages(
            pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
            checkpointId,
            completedCheckpoint.getTimestamp());
}
           

额外的小知识 

  1. 对齐barrier和非对其barrier(unaligned)checkpoint 

 对齐barrier

Flink checkpoint源码理解

 unaligned barrier

Flink checkpoint源码理解

 对齐barrier大家都很熟悉了,第一种的缺点是这种方式是阻塞的,需要等待所有上游的barrier都到齐再触发checkpoint,如果出现反压,可能会导致ck完成延迟或失败

无对齐的barrier在其中一个barrier到达之后,会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,跑到最前面,即第二种图2所示,从而实现即使出现反压,barrier也能比较顺畅地由Source端直达Sink端,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中来,通过Input + Output + State三者的持久化,在Unaligned Checkpoint语义下实现Exactly_Once语义

naligned Checkpoint也存在一些问题:

  • 由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。
  • 随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加

Flink1.11的时候官方当前推荐仅将它应用于那些容易产生反压且I/O压力较小(比如原始状态不太大)的作业中,现在没去看优化的怎么样了

  2. exactly once和at least once与barrier对齐关系

每一个 Task 的通过 InputGate 消费上游 Task 产生的数据,CheckpointBarrierHandler 是对 InputGate 的一层封装,增加了对 CheckpointBarrier 等事件的处理,CheckpointBarrierHandler 有两个实现

  • CheckpointBarrierTracker
  • SingleCheckpointBarrierHandler
Flink checkpoint源码理解

分别对应着flink的两种模式:AT_LEAST_ONCE 和 EXACTLY_ONCE 这两种模式

对于AT_LEAST_ONCE ,数据来了就处理,所有barrier到了就checkpoint,先到的barrier之后的数据不缓存,直接流到下个算子计算处理,当需要回溯时,这部分数据会被重新处理,导致重复处理

对于Exactly_once,barrier来了会把数据缓存起来,等待其他输入的barrier到来之后,制作快照后才会处理,保证了处理且处理一次

继续阅读