检查点状态的保存
在上文中已经提到过barrier算子的流动,其是从上游算子向下游算子流动,在下游的Task任务算子中,针对barrier的接收是委托给CheckpointBarrierHandler.getNextNonBlocked()来获取的,因而在CheckpointBarrierHandler获得CheckpointBarrier后可以及时地进行checkpoint相关的操作。其最终会调用StreamTask#performCheckpoint()方法来触发该barrier的向下游流动以及当前算子的状态检查点快照。
在触发了checkpoint之后,对于一个Task而言,最重要的就是将当前Task中所有算子的状态快照(statesnapshot)储存到外部存储系统的。外部存储系统可能是一个分布式文件系统,也可能是JobManager内存中。在StreamTask.performCheckpoint方法中,开始进行checkpoint操作,这里主要分为三部分:
- checkpoint的准备操作,这里通常不进行太多操作;
- 向下游发送CheckpointBarrier;
- 存储检查点快照;
class StreamTask {
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// 检查点预操作(null)
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
// 向下游节点发送 CheckpointBarrier
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
// 存储当前task的检查点快照
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;
for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
try {
streamRecordWriter.broadcastEvent(message); // 向下游广播 取消ckp事件 CancelCheckpointMarker
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
exception);
}
}
if (exception != null) {
throw exception;
}
return false;
}
}
}
}
其主要存储检查点快照的处理逻辑封装在checkpointState()函数中,其主要逻辑如下:
class StreamTask {
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
//1. 解析得到 CheckpointStorageLocation
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
//2. 将存储过程封装为 CheckpointingOperation,开始进行检查点存储操作
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing(); // 开始进行检查点存储操作
}
}
每一个算子的快照被抽象为OperatorSnapshotFutures,包含了operator state和keyed state的快照结果:
public class OperatorSnapshotFutures {
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
}
检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask可能包含多个算子,因而内部使用一个Map维护OperatorID -> OperatorSnapshotFutures的关系。CheckpointingOperation中,快照操作分为两个阶段,第一阶段是同步执行的,第二阶段是异步执行的:
class StreamTask {
private static final class CheckpointingOperation {
private final StreamTask<?, ?> owner; // 指向当前算子自己(SourceStreamTask、OneInputStreamTask)
private final CheckpointMetaData checkpointMetaData;
private final CheckpointOptions checkpointOptions;
private final CheckpointMetrics checkpointMetrics;
private final CheckpointStreamFactory storageLocation;
private final StreamOperator<?>[] allOperators; // 当前operatorChain的所有操作算子
private long startSyncPartNano;
private long startAsyncPartNano;
// ------------------------
// OperatorID -> OperatorSnapshotFutures
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
// 执行检查点快照
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
// 1. 同步执行的部分
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
// 2. 异步执行的部分
// checkpoint 可以配置成同步执行,也可以配置成异步执行的
// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
} catch (Exception ex) {
// Cleanup to release resources
........
}
}
}
@SuppressWarnings("deprecation")
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
// 调用 StreamOperator.snapshotState 方法进行快照
// 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
}
同步执行阶段
在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个runnable future。根据checkpoint配置成同步模式和异步模式的区别,这个future可能处于完成状态,也可能处于未完成状态:
interface StreamOperator<OUT> {
/**
* Called to draw a state snapshot from the operator.
*
* @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
* the runnable might already be finished.
*
* @throws Exception exception that happened during snapshotting.
*/
OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation) throws Exception;
}
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
@Override
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) {
// 对状态进行快照
snapshotState(snapshotContext);
// raw state,要在子类中自己实现 raw state 的快照写入
// timer 是作为 raw keyed state 写入的
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
// 写入managed state快照
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
// 写入managed keyed state快照
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + ".";
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
throw new Exception(snapshotFailMessage, snapshotException);
}
return snapshotInProgress;
}
}
对上文的每一个算子进行检查点状态的存储AbstractStreamOperator#snapshotState(snapshotContext);如下:
/**
* Stream operators with state, which want to participate in a snapshot need to override this hook method.
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
// 所有的 timer 都作为 raw keyed state 写入
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
KeyedStateCheckpointOutputStream out;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception);
}
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);
timeServiceManager.snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " + getOperatorName() +
" to checkpoint state stream.", exception);
} finally {
try {
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This " +
"might have prevented deleting some state data.", getOperatorName(), closeException);
}
}
}
}
对于其他子类的StreamOperator而言;其继承自AbstractStreamOperator;如AbstractUdfStreamOperator其继承自基类AbstractStreamOperator;并其对ckp的操作如下:
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context); // 先调用父类方法(AbstractStreamOperator#snapshotState方法),写入timer
// 通过反射调用用户函数中的快照操作
StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
}
public final class StreamingFunctionUtils {
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) { // 调用用户自定义的ckp函数来进行对于状态的操作
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
// 如果用户函数实现了 CheckpointedFunction 接口,调用 snapshotState 创建快照
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
// 如果用户函数实现了 ListCheckpointed
if (userFunction instanceof ListCheckpointed) {
// 先调用 snapshotState 方法获取当前状态
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
// 获取后端存储的状态的引用
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
// 清空当前后端存储的 ListState
listState.clear();
// 将当前状态依次加入后端存储
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator " +
"state backend.", e);
}
}
return true;
}
return false;
}
}
从上面的分析可以指定checkpoint操作是如何同用户自定义函数建立关联的了,接下来看看由Flink托管的状态是如何写入存储系统的,即:
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); // 写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); // 写入 keyed state
首先来看看operatorStateBackend.snapshot()。在operatorStateBackend初始化的时候,其会在AbstractStreamOperator#initializeState()中进行初始化;其具体的对象实例化操作委托给了StreamTaskStateInitializerImpl#streamOperatorStateContext()方法;其最终会调用stateBackend.createOperatorStateBackend(environment, operatorIdentifierText)方法;根据其具体的状态存储器实现实例stateBackend来进行对应的operatorStateBackend和keyedStateBackend的实例初始化:
- operatorStateBackend对应的实现类为DefaultOperatorStateBackend;
- keyedStateBackend会根据其具体的状态存储器实例创建不同的实现类,如:
- FsStateBackend对应HeapKeyedStateBackend;
- MemoryStateBackend对应HeapKeyedStateBackend;
- RocksDBStateBackend对应RocksDBKeyedStateBackend;
1、DefaultOperatorStateBackend会将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy完成。首先,会对当前注册的所有operatorstate(包含liststate和broadcaststate)做深度拷贝,然后将实际的写入操作封装在一个异步的FutureTask中,这个FutureTask的主要任务包括:
- 打开输出流
- 写入状态元数据信息
- 写入状态
- 关闭输出流,获得状态句柄
如果不启用异步checkpoint模式,那么这个FutureTask在同步阶段就会立刻执行。
2、keyed state写入的基本流程与此相似,但由于keyed state在存储时有多种实现,包括基于堆内存和RocksDB的不同实现,此外基于RocksDB的实现还包括支持增量checkpoint,因而相比于operatorstate要更复杂一些。另外,Flink自1.5.0版本还引入了一个本地状态存储的优化,支持在TaskManager的本地保存一份keyedstate,试图优化状态恢复的速度和网络开销。
public class DefaultOperatorStateBackend implements OperatorStateBackend {
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
long syncStartTime = System.currentTimeMillis();
// 委托给内部的DefaultOperatorStateBackendSnapshotStrategy默认快照策略去完成
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
return snapshotRunner;
}
}
/**
* Snapshot strategy for this backend.
*/
private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
// 内部类 构造函数
protected DefaultOperatorStateBackendSnapshotStrategy() {
super("DefaultOperatorStateBackend snapshot");
}
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( // 实际执行快照的地方
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions) throws IOException {
if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
return DoneFuture.of(SnapshotResult.empty());
}
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassloader);
try {
// eagerly create deep copies of the list and the broadcast states (if any)
// in the synchronous phase, so that we can use them in the async writing.
// 获得已注册的所有list state和broadcast state的深拷贝
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
// 将主要写入操作封装为一个异步的FutureTask
AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
@Override
protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
// 创建状态输出流
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
registerCloseableForCancellation(localOut);
// get the registered operator state infos ...
// 收集元数据
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... get the registered broadcast operator state infos ...
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots = new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... write them all in the checkpoint stream ...
// 写入元数据
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
// ... and then go for the states ...
// we put BOTH normal and broadcast state metadata here
// 写入状态
int initialMapCapacity = registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(initialMapCapacity);
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and the broadcast states themselves ...
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
BackendWritableBroadcastState<?, ?> value = entry.getValue();
long[] partitionOffsets = {value.write(localOut)};
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and, finally, create the state handle.
OperatorStateHandle retValue = null;
if (unregisterCloseableFromCancellation(localOut)) {
// 关闭输出流,获得状态句柄,后面可以用这个句柄读取状态
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
}
@Override
protected void cleanupProvidedResources() {
// nothing to do
}
@Override
protected void logAsyncSnapshotComplete(long startTime) {
if (asynchronousSnapshots) {
logAsyncCompleted(streamFactory, startTime);
}
}
};
final FutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
// 如果不是异步checkpoint; 那么在这里直接运行FutureTask,即在同步阶段就完成了状态的写入
if (!asynchronousSnapshots) {
task.run();
}
return task;
}
}
异步执行阶段
在CheckpointingOperation#executeCheckpointing()中;异步执行阶段被封装为AsyncCheckpointRunnable,主要的操作包括:
- 执行同步阶段创建的FutureTask
- 完成后向JobMaster中的CheckpointCoordinator发送Ack响应
class StreamTask {
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try { // 实例化 JobManager中存储的状态对象,以及localTaskExecutor中的存储
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
// 完成每一个 operator 的状态写入
// 如果是同步 checkpoint,那么在此之前状态已经写入完成
// 如果是异步 checkpoint,那么在这里才会写入状态
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
// 报告 snapshot 完成
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
}
private void reportCompletedSnapshotStates( // 向JobMater中的CheckpointCoordinator 报告snapshot完成
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
}
}
public class TaskStateManagerImpl implements TaskStateManager {
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
// 通过checkpointCoordinatorGateway RPC接口,发送ACK响应给CheckpointCoordinator
localStateStore.storeLocalState(checkpointId, localState);
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
}
本地状态存储
所谓本地状态存储,即在存储检查点快照时,在Task所在的TaskManager本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对keyed state;以较为简单的HeapKeyedStateBackend为例,其本地状态存储实现如下:
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Base class for the snapshots of the heap backend that outlines the algorithm and offers some hooks to realize
* the concrete strategies. Subclasses must be threadsafe.
*/
private class HeapSnapshotStrategy extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;
@Nonnull
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws IOException {
......
// 创建CheckpointStreamWithResultProvider
final KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
// TODO: this code assumes that writing a serializer is threadsafe, we should support to
// get a serialized form already at state registration time in the future
keySerializer,
metaInfoSnapshots,
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
localRecoveryConfig.isLocalRecoveryEnabled() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
//--------------------------------------------------- this becomes the end of sync part
........
}
}
}
其中关键的一点在于,根据是否启用本地状态恢复来创建不同的CheckpointStreamWithResultProvider:
public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
@Nonnull
static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnegative long checkpointId,
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {
// 创建对应的外部存储
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
try {
File outFile = new File( // 构建本地文件存储
secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
// 有两个输出流,primary和secondary,secondary对应本地存储
return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);
} catch (IOException secondaryEx) {
LOG.warn("Exception when opening secondary/local checkpoint output stream. " +
"Continue only with the primary stream.", secondaryEx);
}
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut对应外部存储,而secondaryOut对应本地存储。状态会输出两份。本地状态句柄会存储在TaskLocalStateStore中。
检查点状态的恢复
当Flink作业失败重启或者从指定SavePoint启动时,需要将整个作业恢复到上一次成功checkpoint的状态。这里主要分为两个阶段:
- CheckpointCoordinator加载最近一次成功的CompletedCheckpoint,并将状态重新分配到不同的Execution(Task)中
- Task启动时进行状态初始化
状态分配
首先,JobMaster在创建ExecutionGraph后会尝试恢复状态到最近一次成功的checkpoint,或者加载SavePoint,最终都会调用CheckpointCoordinator.restoreLatestCheckpointedState()方法:
public class CheckpointCoordinator {
/**
* Restores the latest checkpointed state.
*/
public boolean restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allowNonRestoredState) throws Exception {
synchronized (lock) {
// .........
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
stateAssignmentOperation.assignStates();
// call master hooks for restore
MasterHooks.restoreMasterHooks(
masterHooks,
latest.getMasterHookStates(),
latest.getCheckpointID(),
allowNonRestoredState,
LOG);
//.........
}
}
}
最终,每个Task分配的状态被封装在JobManagerTaskRestore中,并通过Execution.setInitialState()关联到Execution中。JobManagerTaskRestore会作为TaskDeploymentDescriptor的一个属性下发到TaskExecutor中。
Task状态初始化
当TaskDeploymentDescriptor被提交给TaskExecutor之后,TaskExecutor会启动TaskStateManager用于管理当前Task的状态,TaskStateManager对象会基于分配的JobManagerTaskRestore和本地状态存储TaskLocalStateStore进行创建:
class TaskExecutor {
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
// ..........
// 本地状态存储
final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
// 由JobManager分配的用于恢复的状态
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
// 创建TaskStateManager
final TaskStateManager taskStateManager = new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
// 创建并启动 Task
// .........
}
}
在Task启动后,StreamTask会先调用initializeState方法,这样每一个算子都会调用StreamOperator.initializeState()进行状态的初始化:
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable {
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager = Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
// 创建 StreamOperatorStateContext,这一步会进行状态的恢复,
// 这样 operatorStateBackend 和 keyedStateBackend 就可以恢复到到最后一次 checkpoint 的状态
// timeServiceManager 也会恢复
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
// StateInitializationContext对外暴露了state backend,timer service manager等,operator可以借助它来进行状态初始化
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
// 进行状态初始化,在子类中实现,比如调用UDF的状态初始化方法
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
/**
* Stream operators with state which can be restored need to override this hook method.
*/
public void initializeState(StateInitializationContext context) throws Exception {
}
}
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// 用户函数调用状态初始化方法
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
}
状态恢复的关键操作在于通过StreamTaskStateInitializer.streamOperatorStateContext()生成StreamOperatorStateContext,通过StreamOperatorStateContext可以获取statebackend,timerservicemanager等:
public interface StreamOperatorStateContext {
// Returns true, the states provided by this context are restored from a checkpoint/savepoint.
boolean isRestored();
// Returns the operator state backend for the stream operator.
OperatorStateBackend operatorStateBackend();
// Returns the keyed state backend for the stream operator. This method returns null for non-keyed operators.
AbstractKeyedStateBackend<?> keyedStateBackend();
// Returns the internal timer service manager for the stream operator. This method returns null for non-keyed operators.
InternalTimeServiceManager<?> internalTimerServiceManager();
// Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to this stream operator.
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs();
// Returns an iterable to obtain input streams for previously stored keyed state partitions that are assigned tothis operator. This method returns null for non-keyed operators.
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs();
}
为了生成StreamOperatorStateContext,首先要通过TaskStateManager.prioritizedOperatorState()方法获得每个Operator需要恢复的状态句柄;然后使用获得的状态句柄创建并还原statebackend和timer。
这里引入了PrioritizedOperatorSubtaskState,它封装了多个备选的OperatorSubtaskState(快照),这些快照相互之间是可以(部分)替换的,并按照优先级排序。列表中的最后一项是包含了这个子任务的所有状态,但是优先级最低。在进行状态恢复的时候,优先从高优先级的状态句柄中读取状态:
public class PrioritizedOperatorSubtaskState {
/** List of prioritized snapshot alternatives for managed operator state. */
private final List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState;
/** List of prioritized snapshot alternatives for raw operator state. */
private final List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState;
/** List of prioritized snapshot alternatives for managed keyed state. */
private final List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState;
/** List of prioritized snapshot alternatives for raw keyed state. */
private final List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState;
public static class Builder {
/**
* This helper method resolves the dependencies between the ground truth of the operator state obtained from the
* job manager and potential alternatives for recovery, e.g. from a task-local source.
*/
protected <T extends StateObject> List<StateObjectCollection<T>> resolvePrioritizedAlternatives(
StateObjectCollection<T> jobManagerState,
List<StateObjectCollection<T>> alternativesByPriority,
BiFunction<T, T, Boolean> approveFun) {
// Nothing to resolve if there are no alternatives, or the ground truth has already no state, or if we can
// assume that a rescaling happened because we find more than one handle in the JM state (this is more a sanity
// check).
if (alternativesByPriority == null
|| alternativesByPriority.isEmpty()
|| !jobManagerState.hasState()
|| jobManagerState.size() != 1) {
return Collections.singletonList(jobManagerState);
}
// As we know size is == 1
T reference = jobManagerState.iterator().next();
// This will contain the end result, we initialize it with the potential max. size.
List<StateObjectCollection<T>> approved = new ArrayList<>(1 + alternativesByPriority.size());
for (StateObjectCollection<T> alternative : alternativesByPriority) {
// We found an alternative to the JM state if it has state, we have a 1:1 relationship, and the
// approve-function signaled true.
if (alternative != null
&& alternative.hasState()
&& alternative.size() == 1
&& BooleanUtils.isTrue(approveFun.apply(reference, alternative.iterator().next()))) {
approved.add(alternative);
}
}
// 从 JobManager 获取的状态作为最低优先级的备选
// Of course we include the ground truth as last alternative.
approved.add(jobManagerState);
return Collections.unmodifiableList(approved);
}
}
}
public class TaskStateManagerImpl implements TaskStateManager {
@Override
public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
if (jobManagerTaskRestore == null) {
return PrioritizedOperatorSubtaskState.emptyNotRestored();
}
// 从JobManager获取的状态快照
TaskStateSnapshot jobManagerStateSnapshot =
jobManagerTaskRestore.getTaskStateSnapshot();
OperatorSubtaskState jobManagerSubtaskState =
jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
if (jobManagerSubtaskState == null) {
return PrioritizedOperatorSubtaskState.emptyNotRestored();
}
// 本地状态快照作为备选
long restoreCheckpointId = jobManagerTaskRestore.getRestoreCheckpointId();
TaskStateSnapshot localStateSnapshot =
localStateStore.retrieveLocalState(restoreCheckpointId);
localStateStore.pruneMatchingCheckpoints((long checkpointId) -> checkpointId != restoreCheckpointId);
List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();
if (localStateSnapshot != null) {
OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
if (localSubtaskState != null) {
alternativesByPriority = Collections.singletonList(localSubtaskState);
}
}
LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
"state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
// 构建PrioritizedOperatorSubtaskState
PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
jobManagerSubtaskState,
alternativesByPriority,
true);
return builder.build();
}
}
在获得了PrioritizedOperatorSubtaskState之后就可以进行状态的恢复了;状态恢复和创建statebackend耦合在一起,借助BackendRestorerProcedure来完成,具体的逻辑在BackendRestorerProcedure.createAndRestore方法中。
public class StreamTaskStateInitializerImpl implements StreamTaskStateInitializer {
@Override
public StreamOperatorStateContext streamOperatorStateContext(
@Nonnull OperatorID operatorID,
@Nonnull String operatorClassName,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup) throws Exception {
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
// 先获取用于恢复状态的PrioritizedOperatorSubtaskState
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
// .........
}
}
}