天天看点

Flink CheckPoint的状态保存及恢复

检查点状态的保存

       在上文中已经提到过barrier算子的流动,其是从上游算子向下游算子流动,在下游的Task任务算子中,针对barrier的接收是委托给CheckpointBarrierHandler.getNextNonBlocked()来获取的,因而在CheckpointBarrierHandler获得CheckpointBarrier后可以及时地进行checkpoint相关的操作。其最终会调用StreamTask#performCheckpoint()方法来触发该barrier的向下游流动以及当前算子的状态检查点快照。

       在触发了checkpoint之后,对于一个Task而言,最重要的就是将当前Task中所有算子的状态快照(statesnapshot)储存到外部存储系统的。外部存储系统可能是一个分布式文件系统,也可能是JobManager内存中。在StreamTask.performCheckpoint方法中,开始进行checkpoint操作,这里主要分为三部分:

  • checkpoint的准备操作,这里通常不进行太多操作;
  • 向下游发送CheckpointBarrier;
  • 存储检查点快照;
class StreamTask {
    private boolean performCheckpoint(
          CheckpointMetaData checkpointMetaData,
          CheckpointOptions checkpointOptions,
          CheckpointMetrics checkpointMetrics) throws Exception {
       LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
    
       synchronized (lock) {
          if (isRunning) {
             // we can do a checkpoint
    
             // All of the following steps happen as an atomic step from the perspective of barriers and
             // records/watermarks/timers/callbacks.
             // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
             // checkpoint alignments
    
             // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
             //           The pre-barrier work should be nothing or minimal in the common case.
             // 检查点预操作(null)
             operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
    
             // Step (2): Send the checkpoint barrier downstream
             // 向下游节点发送 CheckpointBarrier
             operatorChain.broadcastCheckpointBarrier(
                   checkpointMetaData.getCheckpointId(),
                   checkpointMetaData.getTimestamp(),
                   checkpointOptions);
    
             // Step (3): Take the state snapshot. This should be largely asynchronous, to not
             //           impact progress of the streaming topology
             // 存储当前task的检查点快照
             checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
             return true;
          }
          else {
             // we cannot perform our checkpoint - let the downstream operators know that they
             // should not wait for any input from this operator
    
             // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
             // yet be created
             final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
             Exception exception = null;
    
             for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
                try {
                   streamRecordWriter.broadcastEvent(message); // 向下游广播 取消ckp事件 CancelCheckpointMarker
                } catch (Exception e) {
                   exception = ExceptionUtils.firstOrSuppressed(
                      new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                      exception);
                }
             }
             if (exception != null) {
                throw exception;
             }
             return false;
          }
       }
    }
}
           

其主要存储检查点快照的处理逻辑封装在checkpointState()函数中,其主要逻辑如下:

class StreamTask {
    private void checkpointState(
          CheckpointMetaData checkpointMetaData,
          CheckpointOptions checkpointOptions,
          CheckpointMetrics checkpointMetrics) throws Exception {
       //1. 解析得到 CheckpointStorageLocation
       CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
             checkpointMetaData.getCheckpointId(),
             checkpointOptions.getTargetLocation());
       //2. 将存储过程封装为 CheckpointingOperation,开始进行检查点存储操作
       CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
          this,
          checkpointMetaData,
          checkpointOptions,
          storage,
          checkpointMetrics);
    
       checkpointingOperation.executeCheckpointing(); // 开始进行检查点存储操作
    }
}
           

每一个算子的快照被抽象为OperatorSnapshotFutures,包含了operator state和keyed state的快照结果:

public class OperatorSnapshotFutures {
	@Nonnull
	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
}
           

检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask可能包含多个算子,因而内部使用一个Map维护OperatorID -> OperatorSnapshotFutures的关系。CheckpointingOperation中,快照操作分为两个阶段,第一阶段是同步执行的,第二阶段是异步执行的:

class StreamTask {
	private static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;   // 指向当前算子自己(SourceStreamTask、OneInputStreamTask)
        
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;
        
        private final StreamOperator<?>[] allOperators;  // 当前operatorChain的所有操作算子
        
        private long startSyncPartNano;
        private long startAsyncPartNano;
        
        // ------------------------
        // OperatorID -> OperatorSnapshotFutures
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
        
        // 执行检查点快照
        public void executeCheckpointing() throws Exception {
           startSyncPartNano = System.nanoTime();
        
           try {
              // 1. 同步执行的部分
              for (StreamOperator<?> op : allOperators) {
                 checkpointStreamOperator(op);
              }
              startAsyncPartNano = System.nanoTime();
              checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
        
              // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
              // 2. 异步执行的部分
              // checkpoint 可以配置成同步执行,也可以配置成异步执行的
              // 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态
              AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                 owner,
                 operatorSnapshotsInProgress,
                 checkpointMetaData,
                 checkpointMetrics,
                 startAsyncPartNano);
              owner.cancelables.registerCloseable(asyncCheckpointRunnable);
              owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
           } catch (Exception ex) {
              // Cleanup to release resources
              ........
           }
        }
    }
    
    @SuppressWarnings("deprecation")
    private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
       if (null != op) {
          // 调用 StreamOperator.snapshotState 方法进行快照
          // 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完
          OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                checkpointMetaData.getCheckpointId(),
                checkpointMetaData.getTimestamp(),
                checkpointOptions,
                storageLocation);
          operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
       }
    }
}
           

同步执行阶段

        在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个runnable future。根据checkpoint配置成同步模式和异步模式的区别,这个future可能处于完成状态,也可能处于未完成状态:

interface StreamOperator<OUT> {
    /**
     * Called to draw a state snapshot from the operator.
     *
     * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
     * the runnable might already be finished.
     *
     * @throws Exception exception that happened during snapshotting.
     */
    OperatorSnapshotFutures snapshotState(
       long checkpointId,
       long timestamp,
       CheckpointOptions checkpointOptions,
       CheckpointStreamFactory storageLocation) throws Exception;
}

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, Serializable {
    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
          CheckpointStreamFactory factory) throws Exception {
       KeyGroupRange keyGroupRange = null != keyedStateBackend ?
             keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
             
       OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
       try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
             checkpointId,
             timestamp,
             factory,
             keyGroupRange,
             getContainingTask().getCancelables())) {
          // 对状态进行快照
          snapshotState(snapshotContext);
          // raw state,要在子类中自己实现 raw state 的快照写入
          // timer 是作为 raw keyed state 写入的
          snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
          snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
          // 写入managed state快照
          if (null != operatorStateBackend) {
             snapshotInProgress.setOperatorStateManagedFuture(
                operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
          }
          // 写入managed keyed state快照
          if (null != keyedStateBackend) {
             snapshotInProgress.setKeyedStateManagedFuture(
                keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
          }
       } catch (Exception snapshotException) {
          try {
             snapshotInProgress.cancel();
          } catch (Exception e) {
             snapshotException.addSuppressed(e);
          }
          String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + ".";
          if (!getContainingTask().isCanceled()) {
             LOG.info(snapshotFailMessage, snapshotException);
          }
          throw new Exception(snapshotFailMessage, snapshotException);
       }
       return snapshotInProgress;
    }
}
           

对上文的每一个算子进行检查点状态的存储AbstractStreamOperator#snapshotState(snapshotContext);如下:

/**
 * Stream operators with state, which want to participate in a snapshot need to override this hook method.
 * @param context context that provides information and means required for taking a snapshot
 */
public void snapshotState(StateSnapshotContext context) throws Exception {
   final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
   //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
   // 所有的 timer 都作为 raw keyed state 写入
   if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
      ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
      KeyedStateCheckpointOutputStream out;
      try {
         out = context.getRawKeyedOperatorStateOutput();
      } catch (Exception exception) {
         throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception);
      }

      try {
         KeyGroupsList allKeyGroups = out.getKeyGroupList();
         for (int keyGroupIdx : allKeyGroups) {
            out.startNewKeyGroup(keyGroupIdx);

            timeServiceManager.snapshotStateForKeyGroup(
               new DataOutputViewStreamWrapper(out), keyGroupIdx);
         }
      } catch (Exception exception) {
         throw new Exception("Could not write timer service of " + getOperatorName() +
            " to checkpoint state stream.", exception);
      } finally {
         try {
            out.close();
         } catch (Exception closeException) {
            LOG.warn("Could not close raw keyed operator state stream for {}. This " +
               "might have prevented deleting some state data.", getOperatorName(), closeException);
         }
      }
   }
}
           

对于其他子类的StreamOperator而言;其继承自AbstractStreamOperator;如AbstractUdfStreamOperator其继承自基类AbstractStreamOperator;并其对ckp的操作如下:

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
      extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
       super.snapshotState(context); // 先调用父类方法(AbstractStreamOperator#snapshotState方法),写入timer
       // 通过反射调用用户函数中的快照操作
       StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }
}

public final class StreamingFunctionUtils {
    public static void snapshotFunctionState(
          StateSnapshotContext context,
          OperatorStateBackend backend,
          Function userFunction) throws Exception {
       Preconditions.checkNotNull(context);
       Preconditions.checkNotNull(backend);
       while (true) {
          if (trySnapshotFunctionState(context, backend, userFunction)) { // 调用用户自定义的ckp函数来进行对于状态的操作
             break;
          }
          // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
          if (userFunction instanceof WrappingFunction) {
             userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
          } else {
             break;
          }
       }
    }
    
    private static boolean trySnapshotFunctionState(
          StateSnapshotContext context,
          OperatorStateBackend backend,
          Function userFunction) throws Exception {
       // 如果用户函数实现了 CheckpointedFunction 接口,调用 snapshotState 创建快照
       if (userFunction instanceof CheckpointedFunction) {
          ((CheckpointedFunction) userFunction).snapshotState(context);
          return true;
       }
       // 如果用户函数实现了 ListCheckpointed
       if (userFunction instanceof ListCheckpointed) {
          // 先调用 snapshotState 方法获取当前状态
          @SuppressWarnings("unchecked")
          List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
          // 获取后端存储的状态的引用
          ListState<Serializable> listState = backend.
                getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
          // 清空当前后端存储的 ListState
          listState.clear();
          // 将当前状态依次加入后端存储
          if (null != partitionableState) {
             try {
                for (Serializable statePartition : partitionableState) {
                   listState.add(statePartition);
                }
             } catch (Exception e) {
                listState.clear();
                throw new Exception("Could not write partitionable state to operator " +
                   "state backend.", e);
             }
          }
          return true;
       }
       return false;
    }
}
           

从上面的分析可以指定checkpoint操作是如何同用户自定义函数建立关联的了,接下来看看由Flink托管的状态是如何写入存储系统的,即:

operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); // 写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); // 写入 keyed state
           

        首先来看看operatorStateBackend.snapshot()。在operatorStateBackend初始化的时候,其会在AbstractStreamOperator#initializeState()中进行初始化;其具体的对象实例化操作委托给了StreamTaskStateInitializerImpl#streamOperatorStateContext()方法;其最终会调用stateBackend.createOperatorStateBackend(environment, operatorIdentifierText)方法;根据其具体的状态存储器实现实例stateBackend来进行对应的operatorStateBackend和keyedStateBackend的实例初始化:

  • operatorStateBackend对应的实现类为DefaultOperatorStateBackend;
  • keyedStateBackend会根据其具体的状态存储器实例创建不同的实现类,如:
    • FsStateBackend对应HeapKeyedStateBackend;
    • MemoryStateBackend对应HeapKeyedStateBackend;
    • RocksDBStateBackend对应RocksDBKeyedStateBackend;

       1、DefaultOperatorStateBackend会将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy完成。首先,会对当前注册的所有operatorstate(包含liststate和broadcaststate)做深度拷贝,然后将实际的写入操作封装在一个异步的FutureTask中,这个FutureTask的主要任务包括:

  1. 打开输出流
  2. 写入状态元数据信息
  3. 写入状态
  4. 关闭输出流,获得状态句柄

如果不启用异步checkpoint模式,那么这个FutureTask在同步阶段就会立刻执行。

      2、keyed state写入的基本流程与此相似,但由于keyed state在存储时有多种实现,包括基于堆内存和RocksDB的不同实现,此外基于RocksDB的实现还包括支持增量checkpoint,因而相比于operatorstate要更复杂一些。另外,Flink自1.5.0版本还引入了一个本地状态存储的优化,支持在TaskManager的本地保存一份keyedstate,试图优化状态恢复的速度和网络开销。

public class DefaultOperatorStateBackend implements OperatorStateBackend {
    @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
       long checkpointId,
       long timestamp,
       @Nonnull CheckpointStreamFactory streamFactory,
       @Nonnull CheckpointOptions checkpointOptions) throws Exception {
       long syncStartTime = System.currentTimeMillis();
       // 委托给内部的DefaultOperatorStateBackendSnapshotStrategy默认快照策略去完成
       RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
          snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
       snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
       return snapshotRunner;
    }
}

/**
 * Snapshot strategy for this backend.
 */
private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
   // 内部类 构造函数
   protected DefaultOperatorStateBackendSnapshotStrategy() {
      super("DefaultOperatorStateBackend snapshot");
   }

   @Nonnull
   @Override
   public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(  // 实际执行快照的地方
      final long checkpointId,
      final long timestamp,
      @Nonnull final CheckpointStreamFactory streamFactory,
      @Nonnull final CheckpointOptions checkpointOptions) throws IOException {
      if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
         return DoneFuture.of(SnapshotResult.empty());
      }
      final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = new HashMap<>(registeredOperatorStates.size());
      final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = new HashMap<>(registeredBroadcastStates.size());

      ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
      Thread.currentThread().setContextClassLoader(userClassloader);
      try {
         // eagerly create deep copies of the list and the broadcast states (if any)
         // in the synchronous phase, so that we can use them in the async writing.
         // 获得已注册的所有list state和broadcast state的深拷贝
         if (!registeredOperatorStates.isEmpty()) {
            for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
               PartitionableListState<?> listState = entry.getValue();
               if (null != listState) {
                  listState = listState.deepCopy();
               }
               registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
            }
         }

         if (!registeredBroadcastStates.isEmpty()) {
            for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
               BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
               if (null != broadcastState) {
                  broadcastState = broadcastState.deepCopy();
               }
               registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
            }
         }
      } finally {
         Thread.currentThread().setContextClassLoader(snapshotClassLoader);
      }
      // 将主要写入操作封装为一个异步的FutureTask
      AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
         new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
            @Override
            protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
               // 创建状态输出流
               CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                  streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
               registerCloseableForCancellation(localOut);
               
               // get the registered operator state infos ...
               // 收集元数据
               List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = new ArrayList<>(registeredOperatorStatesDeepCopies.size());
               for (Map.Entry<String, PartitionableListState<?>> entry :
                  registeredOperatorStatesDeepCopies.entrySet()) {
                  operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
               }
               // ... get the registered broadcast operator state infos ...
               List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots = new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
               for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                  registeredBroadcastStatesDeepCopies.entrySet()) {
                  broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
               }

               // ... write them all in the checkpoint stream ...
               // 写入元数据
               DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
               OperatorBackendSerializationProxy backendSerializationProxy =
                  new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
               backendSerializationProxy.write(dov);

               // ... and then go for the states ...

               // we put BOTH normal and broadcast state metadata here
               // 写入状态
               int initialMapCapacity = registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
               final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(initialMapCapacity);

               for (Map.Entry<String, PartitionableListState<?>> entry :
                  registeredOperatorStatesDeepCopies.entrySet()) {

                  PartitionableListState<?> value = entry.getValue();
                  long[] partitionOffsets = value.write(localOut);
                  OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                  writtenStatesMetaData.put(
                     entry.getKey(),
                     new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
               }

               // ... and the broadcast states themselves ...
               for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                  registeredBroadcastStatesDeepCopies.entrySet()) {

                  BackendWritableBroadcastState<?, ?> value = entry.getValue();
                  long[] partitionOffsets = {value.write(localOut)};
                  OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                  writtenStatesMetaData.put(
                     entry.getKey(),
                     new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
               }

               // ... and, finally, create the state handle.
               OperatorStateHandle retValue = null;
               if (unregisterCloseableFromCancellation(localOut)) {
                  // 关闭输出流,获得状态句柄,后面可以用这个句柄读取状态
                  StreamStateHandle stateHandle = localOut.closeAndGetHandle();
                  if (stateHandle != null) {
                     retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                  }
                  return SnapshotResult.of(retValue);
               } else {
                  throw new IOException("Stream was already unregistered.");
               }
            }

            @Override
            protected void cleanupProvidedResources() {
               // nothing to do
            }

            @Override
            protected void logAsyncSnapshotComplete(long startTime) {
               if (asynchronousSnapshots) {
                  logAsyncCompleted(streamFactory, startTime);
               }
            }
         };

      final FutureTask<SnapshotResult<OperatorStateHandle>> task =
         snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
      // 如果不是异步checkpoint; 那么在这里直接运行FutureTask,即在同步阶段就完成了状态的写入
      if (!asynchronousSnapshots) {
         task.run();
      }
      return task;
   }
}
           

异步执行阶段

   在CheckpointingOperation#executeCheckpointing()中;异步执行阶段被封装为AsyncCheckpointRunnable,主要的操作包括:

  • 执行同步阶段创建的FutureTask
  • 完成后向JobMaster中的CheckpointCoordinator发送Ack响应
class StreamTask {
	protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        @Override
        public void run() {
           FileSystemSafetyNet.initializeSafetyNetForThread();
           try {  // 实例化 JobManager中存储的状态对象,以及localTaskExecutor中的存储
              TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
                 new TaskStateSnapshot(operatorSnapshotsInProgress.size());
              TaskStateSnapshot localTaskOperatorSubtaskStates =
                 new TaskStateSnapshot(operatorSnapshotsInProgress.size());
              
              // 完成每一个 operator 的状态写入
              // 如果是同步 checkpoint,那么在此之前状态已经写入完成
              // 如果是异步 checkpoint,那么在这里才会写入状态
              for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
                 OperatorID operatorID = entry.getKey();
                 OperatorSnapshotFutures snapshotInProgress = entry.getValue();
        
                 // finalize the async part of all by executing all snapshot runnables
                 OperatorSnapshotFinalizer finalizedSnapshots =
                    new OperatorSnapshotFinalizer(snapshotInProgress);
        
                 jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                    operatorID,
                    finalizedSnapshots.getJobManagerOwnedState());
        
                 localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                    operatorID,
                    finalizedSnapshots.getTaskLocalState());
              }
        
              final long asyncEndNanos = System.nanoTime();
              final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
              checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
        
              if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
                 CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                 // 报告 snapshot 完成
                 reportCompletedSnapshotStates(
                    jobManagerTaskOperatorSubtaskStates,
                    localTaskOperatorSubtaskStates,
                    asyncDurationMillis);
              } else {
                 LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                    owner.getName(),
                    checkpointMetaData.getCheckpointId());
              }
           } catch (Exception e) {
              handleExecutionException(e);
           } finally {
              owner.cancelables.unregisterCloseable(this);
              FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
           }
        }
	}

	private void reportCompletedSnapshotStates(  // 向JobMater中的CheckpointCoordinator 报告snapshot完成
       TaskStateSnapshot acknowledgedTaskStateSnapshot,
       TaskStateSnapshot localTaskStateSnapshot,
       long asyncDurationMillis) {
       TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
    
       boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
       boolean hasLocalState = localTaskStateSnapshot.hasState();
       // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
       // to stateless tasks on restore. This enables simple job modifications that only concern
       // stateless without the need to assign them uids to match their (always empty) states.
       taskStateManager.reportTaskStateSnapshots(
          checkpointMetaData,
          checkpointMetrics,
          hasAckState ? acknowledgedTaskStateSnapshot : null,
          hasLocalState ? localTaskStateSnapshot : null);
    }
}

public class TaskStateManagerImpl implements TaskStateManager {
    @Override
    public void reportTaskStateSnapshots(
       @Nonnull CheckpointMetaData checkpointMetaData,
       @Nonnull CheckpointMetrics checkpointMetrics,
       @Nullable TaskStateSnapshot acknowledgedState,
       @Nullable TaskStateSnapshot localState) {
       long checkpointId = checkpointMetaData.getCheckpointId();
       // 通过checkpointCoordinatorGateway RPC接口,发送ACK响应给CheckpointCoordinator
       localStateStore.storeLocalState(checkpointId, localState);
       checkpointResponder.acknowledgeCheckpoint(
          jobId,
          executionAttemptID,
          checkpointId,
          checkpointMetrics,
          acknowledgedState);
    }
}
           

本地状态存储

        所谓本地状态存储,即在存储检查点快照时,在Task所在的TaskManager本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对keyed state;以较为简单的HeapKeyedStateBackend为例,其本地状态存储实现如下:

public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    /**
     * Base class for the snapshots of the heap backend that outlines the algorithm and offers some hooks to realize
     * the concrete strategies. Subclasses must be threadsafe.
     */
    private class HeapSnapshotStrategy extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
       private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;
    
       @Nonnull
       @Override
       public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
          long checkpointId,
          long timestamp,
          @Nonnull CheckpointStreamFactory primaryStreamFactory,
          @Nonnull CheckpointOptions checkpointOptions) throws IOException {
          ......
          // 创建CheckpointStreamWithResultProvider
          final KeyedBackendSerializationProxy<K> serializationProxy =
             new KeyedBackendSerializationProxy<>(
                // TODO: this code assumes that writing a serializer is threadsafe, we should support to
                // get a serialized form already at state registration time in the future
                keySerializer,
                metaInfoSnapshots,
                !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
    
          final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
             localRecoveryConfig.isLocalRecoveryEnabled() ?
             
                () -> CheckpointStreamWithResultProvider.createDuplicatingStream(
                   checkpointId,
                   CheckpointedStateScope.EXCLUSIVE,
                   primaryStreamFactory,
                   localRecoveryConfig.getLocalStateDirectoryProvider()) :
                       
                () -> CheckpointStreamWithResultProvider.createSimpleStream(
                   CheckpointedStateScope.EXCLUSIVE,
                   primaryStreamFactory);
    
          //--------------------------------------------------- this becomes the end of sync part
          ........
       }
    }
}
           

其中关键的一点在于,根据是否启用本地状态恢复来创建不同的CheckpointStreamWithResultProvider:

public interface CheckpointStreamWithResultProvider extends Closeable {
    @Nonnull
    static CheckpointStreamWithResultProvider createSimpleStream(
       @Nonnull CheckpointedStateScope checkpointedStateScope,
       @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
       
       CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
          primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
       return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
    }
    
    @Nonnull
    static CheckpointStreamWithResultProvider createDuplicatingStream(
       @Nonnegative long checkpointId,
       @Nonnull CheckpointedStateScope checkpointedStateScope,
       @Nonnull CheckpointStreamFactory primaryStreamFactory,
       @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {
       // 创建对应的外部存储
       CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
          primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
    
       try {
          File outFile = new File(   // 构建本地文件存储
             secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),
             String.valueOf(UUID.randomUUID()));
          Path outPath = new Path(outFile.toURI());
    
          CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
             new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
          // 有两个输出流,primary和secondary,secondary对应本地存储
          return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);
       } catch (IOException secondaryEx) {
          LOG.warn("Exception when opening secondary/local checkpoint output stream. " +
             "Continue only with the primary stream.", secondaryEx);
       }
       return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
    }
}
           

所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut对应外部存储,而secondaryOut对应本地存储。状态会输出两份。本地状态句柄会存储在TaskLocalStateStore中。

检查点状态的恢复

   当Flink作业失败重启或者从指定SavePoint启动时,需要将整个作业恢复到上一次成功checkpoint的状态。这里主要分为两个阶段:

  • CheckpointCoordinator加载最近一次成功的CompletedCheckpoint,并将状态重新分配到不同的Execution(Task)中
  • Task启动时进行状态初始化

状态分配

      首先,JobMaster在创建ExecutionGraph后会尝试恢复状态到最近一次成功的checkpoint,或者加载SavePoint,最终都会调用CheckpointCoordinator.restoreLatestCheckpointedState()方法:

public class CheckpointCoordinator {
    /**
     * Restores the latest checkpointed state.
     */
    public boolean restoreLatestCheckpointedState(
          Map<JobVertexID, ExecutionJobVertex> tasks,
          boolean errorIfNoCheckpoint,
          boolean allowNonRestoredState) throws Exception {
       synchronized (lock) {
          // .........
          // Restore from the latest checkpoint
          CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
          // re-assign the task states
          final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
          StateAssignmentOperation stateAssignmentOperation =
                new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
          stateAssignmentOperation.assignStates();
    
          // call master hooks for restore
          MasterHooks.restoreMasterHooks(
                masterHooks,
                latest.getMasterHookStates(),
                latest.getCheckpointID(),
                allowNonRestoredState,
                LOG);
          //.........
       }
    }
}
           

最终,每个Task分配的状态被封装在JobManagerTaskRestore中,并通过Execution.setInitialState()关联到Execution中。JobManagerTaskRestore会作为TaskDeploymentDescriptor的一个属性下发到TaskExecutor中。

Task状态初始化

        当TaskDeploymentDescriptor被提交给TaskExecutor之后,TaskExecutor会启动TaskStateManager用于管理当前Task的状态,TaskStateManager对象会基于分配的JobManagerTaskRestore和本地状态存储TaskLocalStateStore进行创建:

class TaskExecutor {
    @Override
    public CompletableFuture<Acknowledge> submitTask(
          TaskDeploymentDescriptor tdd,
          JobMasterId jobMasterId,
          Time timeout) {
          // ..........
          // 本地状态存储
        final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
           jobId,
           tdd.getAllocationId(),
           taskInformation.getJobVertexId(),
           tdd.getSubtaskIndex());
        // 由JobManager分配的用于恢复的状态
        final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
        // 创建TaskStateManager
        final TaskStateManager taskStateManager = new TaskStateManagerImpl(
           jobId,
           tdd.getExecutionAttemptId(),
           localStateStore,
           taskRestore,
           checkpointResponder);
           
         // 创建并启动 Task
         // .........
	}
}
           

在Task启动后,StreamTask会先调用initializeState方法,这样每一个算子都会调用StreamOperator.initializeState()进行状态的初始化:

public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable {
    @Override
    public final void initializeState() throws Exception {
       final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
       final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());
       final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables());
       final StreamTaskStateInitializer streamTaskStateManager = Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
       
        // 创建 StreamOperatorStateContext,这一步会进行状态的恢复,
       // 这样 operatorStateBackend 和 keyedStateBackend 就可以恢复到到最后一次 checkpoint 的状态
       // timeServiceManager 也会恢复
       final StreamOperatorStateContext context =
          streamTaskStateManager.streamOperatorStateContext(
             getOperatorID(),
             getClass().getSimpleName(),
             this,
             keySerializer,
             streamTaskCloseableRegistry,
             metrics);
       this.operatorStateBackend = context.operatorStateBackend();
       this.keyedStateBackend = context.keyedStateBackend();
    
       if (keyedStateBackend != null) {
          this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
       }
       timeServiceManager = context.internalTimerServiceManager();
    
       CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
       CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
    
       try {
          // StateInitializationContext对外暴露了state backend,timer service manager等,operator可以借助它来进行状态初始化
          StateInitializationContext initializationContext = new StateInitializationContextImpl(
             context.isRestored(), // information whether we restore or start for the first time
             operatorStateBackend, // access to operator state backend
             keyedStateStore, // access to keyed state backend
             keyedStateInputs, // access to keyed state stream
             operatorStateInputs); // access to operator state stream
          // 进行状态初始化,在子类中实现,比如调用UDF的状态初始化方法
          initializeState(initializationContext);
       } finally {
          closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
          closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
       }
    }

    /**
     * Stream operators with state which can be restored need to override this hook method.
     */
    public void initializeState(StateInitializationContext context) throws Exception {
    }
}

public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
       super.initializeState(context);
       // 用户函数调用状态初始化方法
       StreamingFunctionUtils.restoreFunctionState(context, userFunction);
    }
}
           

状态恢复的关键操作在于通过StreamTaskStateInitializer.streamOperatorStateContext()生成StreamOperatorStateContext,通过StreamOperatorStateContext可以获取statebackend,timerservicemanager等:

public interface StreamOperatorStateContext {
	// Returns true, the states provided by this context are restored from a checkpoint/savepoint.
	boolean isRestored();

	// Returns the operator state backend for the stream operator.
	OperatorStateBackend operatorStateBackend();

	// Returns the keyed state backend for the stream operator. This method returns null for non-keyed operators.
	AbstractKeyedStateBackend<?> keyedStateBackend();

	// Returns the internal timer service manager for the stream operator. This method returns null for non-keyed operators.
	InternalTimeServiceManager<?> internalTimerServiceManager();

	// Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to this stream operator.
	CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs();

	// Returns an iterable to obtain input streams for previously stored keyed state partitions that are assigned tothis operator. This method returns null for non-keyed operators.
	CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs();
}
           

        为了生成StreamOperatorStateContext,首先要通过TaskStateManager.prioritizedOperatorState()方法获得每个Operator需要恢复的状态句柄;然后使用获得的状态句柄创建并还原statebackend和timer。

        这里引入了PrioritizedOperatorSubtaskState,它封装了多个备选的OperatorSubtaskState(快照),这些快照相互之间是可以(部分)替换的,并按照优先级排序。列表中的最后一项是包含了这个子任务的所有状态,但是优先级最低。在进行状态恢复的时候,优先从高优先级的状态句柄中读取状态:

public class PrioritizedOperatorSubtaskState {
	/** List of prioritized snapshot alternatives for managed operator state. */
	private final List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState;

	/** List of prioritized snapshot alternatives for raw operator state. */
	private final List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState;

	/** List of prioritized snapshot alternatives for managed keyed state. */
	private final List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState;

	/** List of prioritized snapshot alternatives for raw keyed state. */
	private final List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState;

	public static class Builder {
        /**
         * This helper method resolves the dependencies between the ground truth of the operator state obtained from the
         * job manager and potential alternatives for recovery, e.g. from a task-local source.
         */
        protected <T extends StateObject> List<StateObjectCollection<T>> resolvePrioritizedAlternatives(
           StateObjectCollection<T> jobManagerState,
           List<StateObjectCollection<T>> alternativesByPriority,
           BiFunction<T, T, Boolean> approveFun) {
        
           // Nothing to resolve if there are no alternatives, or the ground truth has already no state, or if we can
           // assume that a rescaling happened because we find more than one handle in the JM state (this is more a sanity
           // check).
           if (alternativesByPriority == null
              || alternativesByPriority.isEmpty()
              || !jobManagerState.hasState()
              || jobManagerState.size() != 1) {
              return Collections.singletonList(jobManagerState);
           }
        
           // As we know size is == 1
           T reference = jobManagerState.iterator().next();
        
           // This will contain the end result, we initialize it with the potential max. size.
           List<StateObjectCollection<T>> approved = new ArrayList<>(1 + alternativesByPriority.size());
        
           for (StateObjectCollection<T> alternative : alternativesByPriority) {
              // We found an alternative to the JM state if it has state, we have a 1:1 relationship, and the
              // approve-function signaled true.
              if (alternative != null
                 && alternative.hasState()
                 && alternative.size() == 1
                 && BooleanUtils.isTrue(approveFun.apply(reference, alternative.iterator().next()))) {
                 approved.add(alternative);
              }
           }
        	// 从 JobManager 获取的状态作为最低优先级的备选
           // Of course we include the ground truth as last alternative.
           approved.add(jobManagerState);
           return Collections.unmodifiableList(approved);
        }
	}
}

public class TaskStateManagerImpl implements TaskStateManager {
    @Override
    public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
       if (jobManagerTaskRestore == null) {
          return PrioritizedOperatorSubtaskState.emptyNotRestored();
       }
       // 从JobManager获取的状态快照
       TaskStateSnapshot jobManagerStateSnapshot =
          jobManagerTaskRestore.getTaskStateSnapshot();
       OperatorSubtaskState jobManagerSubtaskState =
          jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
          
       if (jobManagerSubtaskState == null) {
          return PrioritizedOperatorSubtaskState.emptyNotRestored();
       }
       // 本地状态快照作为备选
       long restoreCheckpointId = jobManagerTaskRestore.getRestoreCheckpointId();
       TaskStateSnapshot localStateSnapshot =
          localStateStore.retrieveLocalState(restoreCheckpointId);
    
       localStateStore.pruneMatchingCheckpoints((long checkpointId) -> checkpointId != restoreCheckpointId);
       List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();
       if (localStateSnapshot != null) {
          OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
          if (localSubtaskState != null) {
             alternativesByPriority = Collections.singletonList(localSubtaskState);
          }
       }
       LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
             "state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
       // 构建PrioritizedOperatorSubtaskState
       PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
          jobManagerSubtaskState,
          alternativesByPriority,
          true);
       return builder.build();
    }
}
           

在获得了PrioritizedOperatorSubtaskState之后就可以进行状态的恢复了;状态恢复和创建statebackend耦合在一起,借助BackendRestorerProcedure来完成,具体的逻辑在BackendRestorerProcedure.createAndRestore方法中。

public class StreamTaskStateInitializerImpl implements StreamTaskStateInitializer {
    @Override
    public StreamOperatorStateContext streamOperatorStateContext(
       @Nonnull OperatorID operatorID,
       @Nonnull String operatorClassName,
       @Nonnull KeyContext keyContext,
       @Nullable TypeSerializer<?> keySerializer,
       @Nonnull CloseableRegistry streamTaskCloseableRegistry,
       @Nonnull MetricGroup metricGroup) throws Exception {
    
       TaskInfo taskInfo = environment.getTaskInfo();
       OperatorSubtaskDescriptionText operatorSubtaskDescription =
          new OperatorSubtaskDescriptionText(
             operatorID,
             operatorClassName,
             taskInfo.getIndexOfThisSubtask(),
             taskInfo.getNumberOfParallelSubtasks());
    
       final String operatorIdentifierText = operatorSubtaskDescription.toString();
       // 先获取用于恢复状态的PrioritizedOperatorSubtaskState
       final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
          taskStateManager.prioritizedOperatorState(operatorID);
    
       AbstractKeyedStateBackend<?> keyedStatedBackend = null;
       OperatorStateBackend operatorStateBackend = null;
       CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
       CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
       InternalTimeServiceManager<?> timeServiceManager;
    
       try {
          // -------------- Keyed State Backend --------------
          keyedStatedBackend = keyedStatedBackend(
             keySerializer,
             operatorIdentifierText,
             prioritizedOperatorSubtaskStates,
             streamTaskCloseableRegistry,
             metricGroup);
             
          // -------------- Operator State Backend --------------
          operatorStateBackend = operatorStateBackend(
             operatorIdentifierText,
             prioritizedOperatorSubtaskStates,
             streamTaskCloseableRegistry);
             
          // -------------- Raw State Streams --------------
          rawKeyedStateInputs = rawKeyedStateInputs(
             prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
          streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
    
          rawOperatorStateInputs = rawOperatorStateInputs(
             prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
          streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
    
          // -------------- Internal Timer Service Manager --------------
          timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
    
          // -------------- Preparing return value --------------
          return new StreamOperatorStateContextImpl(
             prioritizedOperatorSubtaskStates.isRestored(),
             operatorStateBackend,
             keyedStatedBackend,
             timeServiceManager,
             rawOperatorStateInputs,
             rawKeyedStateInputs);
       } catch (Exception ex) {
          // cleanup if something went wrong before results got published.
          // .........
       }
    }
}
           

继续阅读