快照政策(SnapshotStrategy)
Flink的檢查點機制是建立在分布式一緻快照之上的,進而實作資料處理的exactly-once處理語義。無論是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)還是Operator state(DefaultOperatorStateBackend)都會接收快照執行請求(snapshot方法),而具體的快照操作都交由具體的snapshot政策完成。
下面是Flink快照政策UML,可以看到Keyed state中的
HeapSnapshotStrategy
和
RocksDBSnapshotStrategyBase
分别對應堆記憶體和RocksDB(RocksDB又細分為全量快照和增量快照)存儲後端的快照執行政策,而
DefaultOperatorStateBackendSnapshotStrategy
對應着Operator state存儲後端快照執行政策。
除了Keyed state和Operator state之外,因為savepoint本質也是snapshot的特殊實作,是以對應的savepoint執行政策
SavepointSnapshotStrategy
也實作了
SnapshotStrategy
接口。

下面是
SnapshotStrategy
接口定義,其中定義了執行快照的所需步驟:
- 同步執行部分,用于生成執行快照所需的資源,為下一步寫入快照資料做好資源準備。
- 異步執行部分,将快照資料寫入到提供的
中。CheckpointStreamFactory
public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> {
//同步執行生成快照的部分,可以了解為為執行快照準備必要的資源。
SR syncPrepareResources(long checkpointId) throws Exception;
//異步執行快照寫入部分,快照資料寫入到CheckpointFactory
SnapshotResultSupplier<S> asyncSnapshot(
SR syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions);
//用于執行異步快照部分的Supplier
@FunctionalInterface
interface SnapshotResultSupplier<S extends StateObject> {
//Performs the asynchronous part of a checkpoint and returns the snapshot result.
SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception;
}
}
SnapshotResources
所對應的UML圖:
- 全量快照
下分别對應着堆記憶體快照資源FullSnapshotResources
以及RocksDB全量快照資源實作類HeapSnapshotResources
RocksDBFullSnapshotResources
- RocksDB增量快照資源實作類
。IncrementalRocksDBSnapshotResoruces
- Operator state快照資源實作類
DefaultOperatorStateBackendSnapshotResources
SnapshotResources
接口定義如下,隻有一個release方法定義,用于在異步Snapshot執行完成後清空資源。
@Internal
public interface SnapshotResources {
/** Cleans up the resources after the asynchronous part is done. */
void release();
}
關于具體資源實作類我們在對應的快照政策中來檢視。
堆記憶體快照政策(HeapSnasphotStrategy)
在看堆記憶體快照政策之前,我們先看下堆記憶體執行快照所對應的資源類
HeapSnapshotResources
。通過上面的UML我們可以看到堆記憶體快照和RocksDB全量快照都實作了
FullSnapshotResources
,這也說明了堆記憶體存儲後端不存在增量快照的實作。
FullSnapshotResources
定義了與具體存儲後端無關的全量執行全量快照資源,它們都是通過
FullSnapshotAsyncWriter
來寫快照資料。
FullSnapshotResources
接口定義如下,其中泛型K代表了具體存儲key的資料類型。
public interface FullSnapshotResources<K> extends SnapshotResources {
//傳回此狀态快照的中繼資料清單,StateMetaInfoSnapshot記錄每個狀态對應快照中繼資料資訊,比如state name、 backend 類型、序列化器等。
List<StateMetaInfoSnapshot> getMetaInfoSnapshots();
//建立用于周遊目前快照的疊代器
KeyValueStateIterator createKVStateIterator() throws IOException;
//目前快照對應的KeyGroupRange
KeyGroupRange getKeyGroupRange();
/** Returns key {@link TypeSerializer}. */
TypeSerializer<K> getKeySerializer();
/** Returns the {@link StreamCompressionDecorator} that should be used for writing. */
StreamCompressionDecorator getStreamCompressionDecorator();
}
下面我們看下
HeapSnapshotStrategy
中的兩個核心方法
syncPrepareResources
asyncSnapshot
class HeapSnapshotStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
...
//準備snapshot資源HeapSnapshotResources
@Override
public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
return HeapSnapshotResources.create(
registeredKVStates,
registeredPQStates,
keyGroupCompressionDecorator,
keyGroupRange,
getKeySerializer(),
totalKeyGroups);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
HeapSnapshotResources<K> syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
......
//SupplierWithException是Java Supplier可能抛出異常的函數接口,第一個泛型參數是supplier執行傳回類型,第二個參數為Supplier中函數抛出的異常
final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier =
localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢複
&& !checkpointOptions.getCheckpointType().isSavepoint()
? () ->
createDuplicatingStream( //本地恢複并且目前不是savepoint,建立複制流
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
streamFactory,
localRecoveryConfig
.getLocalStateDirectoryProvider())
: () ->
createSimpleStream(//非本地恢複,或者是savepoint,建立簡單流
CheckpointedStateScope.EXCLUSIVE, streamFactory);
return (snapshotCloseableRegistry) -> {
......
//輸出資料流
final CheckpointStreamFactory.CheckpointStateOutputStream localStream =
streamWithResultProvider.getCheckpointOutputStream();
////使用KeyedBackendSerializationProxy寫cp資料
final DataOutputViewStreamWrapper outView =
new DataOutputViewStreamWrapper(localStream);
serializationProxy.write(outView);
......
};
}
}
上面asyncSnapshot方法通過
CheckpointStreamWithResultProvider
來建立快照輸出流。該類核心就是封裝了擷取輸出流,如果沒有配置本地狀态恢複,隻會建立一個輸出流來講snapshot資料寫入到job所配置的Checkpoint存儲。如果配置了本地恢複,就需要将狀态資料寫本地了(本地資料恢複),是以對于這種情況會擷取兩個輸出流,一個用于寫配置的Checkpoint存儲,一個用于寫本地。
public interface CheckpointStreamWithResultProvider extends Closeable {
//關閉輸出流,并傳回帶有流句柄的快照結果
@Nonnull
SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;
//傳回snapshot輸出流
@Nonnull
CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();
@Override
default void close() throws IOException {
getCheckpointOutputStream().close();
}
...
}
而
CheckpointStreamWithResultProvider
的兩個内部實作類也就分别對應了建立simple流(PrimaryStreamOnly,隻會建立一個輸出流, 這個流是我們配置checkpoint存儲的寫入地方,可能是遠端HDFS、JobManager等),和建立duplicating流(PrimaryAndSecondaryStream,兩個輸出流,第一個流和PrimaryStreamOnly一樣;第二個輸出流用于寫入到本地、TaskManager等,用于本地恢複)。

建立simple stream,下面可以看到隻會建立一個primary stream。
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory)
throws IOException {
//建立主輸出流
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
建立duplicating stream,可以看到除了一個primary stream外,還會建立寫檔案的second stream。
@Nonnull
static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnegative long checkpointId,
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
throws IOException {
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
try {
//cp資料寫出路徑
File outFile =
new File(
secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(
checkpointId),
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());
//建構寫入檔案的輸出流
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
primaryOut, secondaryOut);
} catch (IOException secondaryEx) {
LOG.warn(
"Exception when opening secondary/local checkpoint output stream. "
+ "Continue only with the primary stream.",
secondaryEx);
}
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
上面
CheckpointStreamFactory
建立輸出流,該輸出流用于将Checkpoint資料寫入到外部,比如通過
FsCheckpoihntStreamFactory
将檢查點資料寫到外部檔案系統。

public interface CheckpointStreamFactory {
//建立一個新的狀态輸出流,CheckpointStateOutputStream為目前CheckpointStreamFactory内部靜态抽象類
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
throws IOException;
//CheckpointStateOutputStream基類,相關實作都在CheckpointStreamFactory的子類
abstract class CheckpointStateOutputStream extends FSDataOutputStream {
//關閉資料流并擷取句柄
@Nullable
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
//關閉資料流
@Override
public abstract void close() throws IOException;
}
}
RocksDB快照存儲政策
上面的UML我們可以知道RocksDB快照存儲政策主要對應三個核心類,抽象類
RocksDBSnapshotStrategyBase
、全量快照政策
RocksDBFullSnapshotStrategy
和增量快照政策
RocksDBIncrementalSnapshotStrategy
RocksDBSnapshotStrategyBase
定義了一些RocksDB、state相關的成員變量,具體實作都在相關子類中。
RocksDBFullSnapshotStrategy
用于建立
RocksDBKeyedStateBackend
的全量快照,每次Checkpoint會将全量狀态資料同步到遠端(JobManager或HDFS)。
下面我們同樣看下核心方法:asyncPrepareResources和asyncSnapshot。
public class RocksFullSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
......
@Override
public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
//建構RocksDB全量快照資源類,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了
//RocksDB 執行個體和快照Snapshot
return RocksDBFullSnapshotResources.create(
kvStateInformation,
registeredPQStates,
db,
rocksDBResourceGuard,
keyGroupRange,
keySerializer,
keyGroupPrefixBytes,
keyGroupCompressionDecorator);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
FullSnapshotResources<K> fullRocksDBSnapshotResources,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
timestamp);
}
return registry -> SnapshotResult.empty();
}
//createCheckpointStreamSupplier和Heap中一樣,根據是否啟動本地恢複,建立Duplicating和simple stream
final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier =
createCheckpointStreamSupplier(
checkpointId, checkpointStreamFactory, checkpointOptions);
//建立全量異步Writer
return new FullSnapshotAsyncWriter<>(
checkpointOptions.getCheckpointType(),
checkpointStreamSupplier,
fullRocksDBSnapshotResources);
}
......
}
FullSnapshotAsyncWriter
也是一個Supplier,用于異步寫全量快照資料到給定的輸出流中。
public class FullSnapshotAsyncWriter<K>
implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
throws Exception {
......
//擷取輸出流
final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider =
checkpointStreamSupplier.get();
snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
//寫快照資料到輸出流中
writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
......
}
private void writeSnapshotToOutputStream(
@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
@Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets)
throws IOException, InterruptedException {
//通過輸出視圖将快照資料寫入到指定輸出流中,注意 checkpointStreamWithResultProvider可能寫兩份資料
final DataOutputView outputView =
new DataOutputViewStreamWrapper(
checkpointStreamWithResultProvider.getCheckpointOutputStream());
//寫中繼資料
writeKVStateMetaData(outputView);
//為每個state執行個體寫狀态資料
try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) {
writeKVStateData(
kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
}
}
}
下面我們看下最關鍵的writeKVStateData,到底是怎麼将全量資料寫到外部的。我們抛開繁雜的細節,就看這裡怎麼寫的。可以看到實際就是疊代
KeyValueStateIterator
private void writeKVStateData(
final KeyValueStateIterator mergeIterator,
final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
final KeyGroupRangeOffsets keyGroupRangeOffsets)
throws IOException, InterruptedException {
......
try {
......
//就是周遊KeyValueStateIterator疊代器
// main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking
// key-group offsets.
while (mergeIterator.isValid()) {
......
writeKeyValuePair(previousKey, previousValue, kgOutView);
......
// request next k/v pair
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
......
} finally {
// this will just close the outer stream
IOUtils.closeQuietly(kgOutStream);
}
}
KeyValueStateIterator
就是記錄了目前快照的所有key-value實體,RocksDB和Heap分别有各自的疊代器實作。

我們看下
RocksStatesPerKeyGroupMergeIterator
是如何建立的。我們在上面看
FullSnapshotResources
接口時看到了抽象方法
createKVStateIterator
定義,該方法就是專門用于建立疊代器的。
HeapSnapshotResources
RocksDBFullSnapshotResources
分别實作了該方法來建立Heap和RocksDB疊代器。下面是
RocksDBFullSnapshotResources.createKVStateIterator
實作。
@Override
public KeyValueStateIterator createKVStateIterator() throws IOException {
......
try {
//建立RocksDB ReadOptions,設定讀取上面的RocksDB snapshot,該snapshot是在Checkpoint同步階段生成的
ReadOptions readOptions = new ReadOptions();
closeableRegistry.registerCloseable(readOptions::close);
readOptions.setSnapshot(snapshot);
//RocksDBIteratorWrapper是對RocksDBIterator的一層包裝
List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
createKVStateIterators(closeableRegistry, readOptions);
.......
//RocksStatesPerKeyGroupMergeIterator實際是将多個state執行個體(ColumnFamily)的疊代器包成一個疊代器
return new RocksStatesPerKeyGroupMergeIterator(
closeableRegistry,
kvStateIterators,
heapPriorityQueueIterators,
keyGroupPrefixBytes);
} catch (Throwable t) {
IOUtils.closeQuietly(closeableRegistry);
throw new IOException("Error creating merge iterator", t);
}
}
private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
new ArrayList<>(metaData.size());
int kvStateId = 0;
//每個state,也就是每個RocksDB的ColumnFamily都會建立一個疊代器
for (MetaData metaDataEntry : metaData) {
RocksIteratorWrapper rocksIteratorWrapper =
createRocksIteratorWrapper(
db,
metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle,
metaDataEntry.stateSnapshotTransformer,
readOptions);
kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
closeableRegistry.registerCloseable(rocksIteratorWrapper);
++kvStateId;
}
return kvStateIterators;
}
private static RocksIteratorWrapper createRocksIteratorWrapper(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle,
StateSnapshotTransformer<byte[]> stateSnapshotTransformer,
ReadOptions readOptions) {
//建立RocksDB Iterator,被包在了Flink定義的RocksDBIteratorWrapper中
RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
return stateSnapshotTransformer == null
? new RocksIteratorWrapper(rocksIterator)
: new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
}
上面代碼可以看到這裡的疊代器其實本質還是RocksDB自己的疊代器(指定了讀取的snapshot),Flink将其包在了
RocksDBIteratorWrapper
中(為什麼需要包一層可以檢視RocksDB自身官網
Iterator異常處理)。因為可能有多個state執行個體,每個執行個體都有自己的一個疊代器,最後Flink将這些疊代器封裝到一個疊代器中,即
RocksStatetsPerKeyGroupMergeIterator
增量快照
RocksIncrementalSnapshotStrategy
是
RocksDBKeyedStateBackend
增量快照政策,它是基于RocksDB的native Checkpoint來實作增量快照的。
我們在看
RocksIncrementalSnapshotStrategy
的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照會用到的一些關鍵成員變量。
//RocksDB增量快照資源資訊為内部類IncrementalRocksDBSnapshotResources
public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<
K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {
//RocksDB執行個體目錄
@Nonnull private final File instanceBasePath;
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
@Nonnull private final UUID backendUID;
//記錄了checkpoint id和目前checkpoint sst檔案映射關系
@Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
//最後一次完成的Checkpoint ID
private long lastCompletedCheckpointId;
//用于上傳快照檔案(RocksDB checkpoint生成的sst檔案等)
private final RocksDBStateUploader stateUploader;
...
}
下面我們再看下同步資源準備階段,主要做了兩件事:
- 擷取最近一次Checkpoint生成的sst檔案,也就是通過materializedSstFiles擷取。用于增量檔案對比。
- 建立RocksDB Checkpoint。
@Override
public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
throws Exception {
//目錄準備,如果開啟本地恢複,則建立永久目錄,否則建立臨時目錄
final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
new ArrayList<>(kvStateInformation.size());
//最近一次完成的Checkpoint 所生成的sst檔案,用于增量對比
final Set<StateHandleID> baseSstFiles =
snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
//建立RocksDB 檢查點
takeDBNativeCheckpoint(snapshotDirectory);
return new IncrementalRocksDBSnapshotResources(
snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);
}
takeDBNativeCheckpoint
就是同步建立RocksDB的Checkpoint,Checkpoint資料會在指定目錄生成(sst檔案、misc檔案)。
private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
throws Exception {
try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
Checkpoint checkpoint = Checkpoint.create(db)) {
checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
} catch (Exception ex) {
......
}
}
asyncSnapshot内部很簡單,主要建立
RocksDBIncrementalSnapshotOperation
Supplier來建立增量快照。
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
IncrementalRocksDBSnapshotResources snapshotResources,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
...
return new RocksDBIncrementalSnapshotOperation(
checkpointId,
checkpointStreamFactory,
snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目錄
snapshotResources.baseSstFiles, //上次Cp完成的sst檔案
snapshotResources.stateMetaInfoSnapshots);
}
下面我們看下增量快照實作的核心
RocksDBIncrementalSnapshotOperation
private final class RocksDBIncrementalSnapshotOperation
implements SnapshotResultSupplier<KeyedStateHandle> {
...
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
throws Exception {
...
// 目前RocksDB checkpoint生成的sst檔案
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// 目前RocksDB Checkpoint的misc files(中繼資料檔案)
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
......
//上傳增量sst檔案和misc 檔案,uploadSstFiles方法内部擷取周遊RocksDB Checkpoint目錄比較新增sst檔案
uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
//塞入目前Checkpoint對應sst檔案
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
......
}
}
我們再看下上面的uploadSstFiles方法實作:
private void uploadSstFiles(
@Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
@Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
@Nonnull CloseableRegistry snapshotCloseableRegistry)
throws Exception {
//增量sst本地檔案路徑
Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
//misc檔案路徑
Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
//目前RocksDB Checkpoint目錄
Path[] files = localBackupDirectory.listDirectory();
if (files != null) {
//查找增量檔案
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
//使用stateUploader上傳增量sst檔案
sstFiles.putAll(
stateUploader.uploadFilesToCheckpointFs(
sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
//上傳misc檔案
miscFiles.putAll(
stateUploader.uploadFilesToCheckpointFs(
miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
}
}
createUploadFilesPaths
方法用于對比查找增量sst檔案,并生成要被上傳的sst檔案和misc檔案。
private void createUploadFilePaths(
Path[] files,
Map<StateHandleID, StreamStateHandle> sstFiles,
Map<StateHandleID, Path> sstFilePaths,
Map<StateHandleID, Path> miscFilePaths) {
for (Path filePath : files) {
final String fileName = filePath.getFileName().toString();
//檔案句柄
final StateHandleID stateHandleID = new StateHandleID(fileName);
//sst檔案和最後一次Cp sst檔案對比,查找增量
if (fileName.endsWith(SST_FILE_SUFFIX)) {
final boolean existsAlready =
baseSstFiles != null && baseSstFiles.contains(stateHandleID);
if (existsAlready) {
//對于之前已經存在的sst檔案,隻使用一個占位符說明之前上傳過的,檔案在共享目錄
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
//新增檔案,将要被上傳的
sstFilePaths.put(stateHandleID, filePath);
}
} else {
//misc檔案全部上傳
miscFilePaths.put(stateHandleID, filePath);
}
}
}
可以看到增量快照的實作邏輯就是:
- 通過RocksDB的Checkpoint生成目前快照的sst檔案(由于LSM特性,sst檔案是不可變的).
- Flink每次記錄目前Checkpoint id和其快照sst檔案的映射關系。
- 上傳目前Checkpoint對應的sst檔案和misc檔案。
- 之後的Checkpoint中如果還有之前的sst檔案,那這些檔案就不需要在上傳到HDFS了。
可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst檔案是遞增不變的特性。
Operator state快照政策
Operator state的快照政策隻有一個,即
DefaultOperatorStateBackendSnapshotStrategy
,它将Operator state中的ListState和BroadcastState的快照資料寫出到快照存儲端。
class DefaultOperatorStateBackendSnapshotStrategy
implements SnapshotStrategy<
OperatorStateHandle,
DefaultOperatorStateBackendSnapshotStrategy
.DefaultOperatorStateBackendSnapshotResources> {
private final ClassLoader userClassLoader;
//Operator state中隻有兩類state:ListState和BroadcastState
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
protected DefaultOperatorStateBackendSnapshotStrategy(
ClassLoader userClassLoader,
Map<String, PartitionableListState<?>> registeredOperatorStates,
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) {
this.userClassLoader = userClassLoader;
this.registeredOperatorStates = registeredOperatorStates;
this.registeredBroadcastStates = registeredBroadcastStates;
}
......
}
在同步準備資源階段,
DefaultOperatorStateBackendSnapshotStrategy
隻做了一件事:深拷貝ListState和BroadcastState。深拷貝的目的就是同步建立這個時刻的快照,以保證exactly-once。
@Override
public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) {
//存放拷貝後的Operator state
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassLoader);
try {
//将傳遞ListState和BroadcastState進行深拷貝,便于後續使用
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
//拷貝broad cast state
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
return new DefaultOperatorStateBackendSnapshotResources(
registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies);
}
深拷貝完Operator state後,asyncSnapshot方法就開始異步寫快照資料到
CheckpointStreamFactory
了。
@Override
public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
DefaultOperatorStateBackendSnapshotResources syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
......
return (snapshotCloseableRegistry) -> {
//建立輸出流
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
......
//通過OperatorBackendSerializationProxy寫快照資料到輸出流
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(
operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
......
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
};
}