天天看点

3.8、Flink流处理(Stream API)- State & Fault Tolerance(状态和容错)之 Custom Serialization for Managed使用自定义状态序列化器 状态序列化器和模式演化预定义的方便的TypeSerializerSnapshot类实施说明和最佳实践在 Flink 1.7 之前从废弃的序列化器快照APIs迁移

目录

使用自定义状态序列化器

 状态序列化器和模式演化

TypeSerializerSnapshot 抽象

Flink 如何与 TypeSerializer 和 typeseriizersnapshot 抽象交互

堆外状态后端(例如rocksdbstateback后端)

堆状态后端(例如 memorystateback、fsstateback )

预定义的方便的TypeSerializerSnapshot类

实现一个 SimpleTypeSerializerSnapshot

实现一个 CompositeTypeSerializerSnapshot

实施说明和最佳实践

1、通过用类名实例化 Flink 恢复序列化程序快照

2、避免在不同的序列化器之间共享相同的 typeseriizersnapshot 类

3、避免对序列化器快照内容使用Java序列化

在 Flink 1.7 之前从废弃的序列化器快照APIs迁移

本页面的目标是为需要使用自定义状态序列化的用户提供指南,包括如何提供自定义状态序列化器,以及实现允许状态模式演化的序列化器的指南和最佳实践。

使用自定义状态序列化器

在注册 managed operator 或 managed operator 时,需要一个

StateDescriptor 

来指定状态的名称以及关于状态类型的信息。类型信息由 Flink 的类型序列化框架用于为状态创建适当的序列化器。

也可以完全绕过这一点,让Flink使用您自己的自定义序列化器来序列化 managed states,只需用您自己的

TypeSerializer 

实现直接实例化

StateDescriptor:

class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}

val descriptor = new ListStateDescriptor[(String, Integer)](
    "state-name",
    new CustomTypeSerializer)
)

checkpointedState = getRuntimeContext.getListState(descriptor)
           

 状态序列化器和模式演化

本节解释与状态序列化和模式演化相关的面向用户的抽象,以及关于 Flink 如何与这些抽象交互的必要内部细节。

从保存点恢复时,Flink允许更改用于读写以前注册状态的序列化器,这样用户就不会被锁定到任何特定的序列化模式中。当状态恢复时,将为状态注册一个新的序列化器(即,该序列化器附带用于访问恢复作业中的状态的

StateDescriptor

)。这个新的序列化器可能具有与前一个序列化器不同的模式。因此,在实现状态序列化器时,除了读取/写入数据的基本逻辑之外,另一件需要记住的重要事情是将来如何更改序列化模式。

当谈到模式时,在这种上下文中,这个术语可以在引用状态类型的数据模型和状态类型的序列化二进制格式之间互换。一般来说,模式在以下几种情况下会发生变化:

  1. 状态类型的数据模式已经演化,即从用作状态的POJO中添加或删除字段。
  2. 一般来说,在对数据模式进行更改之后,需要升级序列化程序的序列化格式。
  3. 序列化程序的配置已更改。

为了让新的执行获得关于状态的书面模式的信息,并检测模式是否发生了变化,在获取算子状态的保存点时,状态序列化器的快照需要与状态字节一起写入。这是一个抽象的

TypeSerializerSnapshot

,将在下一小节中解释。

TypeSerializerSnapshot 抽象

public interface TypeSerializerSnapshot<T> {
    int getCurrentVersion();
    void writeSnapshot(DataOuputView out) throws IOException;
    void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
    TypeSerializer<T> restoreSerializer();
}
           
public abstract class TypeSerializer<T> {    
    
    // ...
    
    public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}
           

 序列化器的

TypeSerializerSnapshot 

是一个时间点信息,它是关于状态序列化器的写模式的唯一来源,以及恢复与给定时间点相同的序列化程序所需的任何附加信息。当序列化器快照在 writeSnapshot 和 readSnapshot 方法中定义时,应该在恢复时写入和读取什么内容的逻辑。

请注意,快照本身的写模式可能也需要随时间变化(例如,当您希望向快照添加有关序列化程序的更多信息时)。为了促进这一点,对快照进行版本控制,并在 getCurrentVersion 方法中定义当前版本号。在恢复时,当从保存点读取序列化器快照时,写入快照的模式的版本将提供给 readSnapshot 方法,以便读实现可以处理不同的版本。

在恢复时,应该在 resolveSchemaCompatibility 方法中实现检测新序列化程序的模式是否更改的逻辑。在算子的恢复执行中,当先前的已注册状态被新的序列化器再次注册时,新的序列化器将通过此方法提供给先前的序列化器的快照。该方法返回一个

TypeSerializerSchemaCompatibility

,表示兼容性解析的结果,可以是以下内容之一:

  1. TypeSerializerSchemaCompatibility.compatibleAsIs():

    这个结果表明新的序列化器是兼容的,这意味着新的序列化器具有与前一个序列化器相同的模式。新的序列化器可能已经在 resolveSchemaCompatibility 方法中重新配置,因此它是兼容的。
  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration():

    这个结果表明新的序列化器有不同的串行化模式,并可以从旧模式通过使用前面的序列化器(识别旧模式)读取字节状态对象,然后重写对象回到字节与新的序列化器(识别新模式)。
  3. TypeSerializerSchemaCompatibility.incompatible():

    这个结果表明新的序列化器有不同的序列化模式,但是不可能从旧的模式迁移。

最后一点细节是在需要迁移的情况下如何获得先前的序列化器。序列化器的

TypeSerializerSnapshot 

的另一个重要作用是,它充当一个工厂来恢复以前的序列化器。更具体地说,TypeSerializerSnapshot 应该实现 restoreSerializer 方法来实例化一个序列化器实例,该实例识别前一个序列化器的模式和配置,因此可以安全地读取前一个序列化器写的数据。

Flink 如何与 TypeSerializer 和 typeseriizersnapshot 抽象交互

最后,本节总结了 Flink (更具体地说是状态后端)如何与抽象交互。根据状态后端,交互略有不同,但这与状态序列化器及其序列化器快照的实现是正交的。

堆外状态后端(例如rocksdbstateback后端)

向具有模式a的状态序列化器注册新状态

  1. 状态的已注册类型序列化器用于对每个状态访问进行读写状态。
  2. 状态是在模式A中编写的。

取一个保存点

  1. 序列化器快照是通过 TypeSerializer#snapshotConfiguration 方法提取的。
  2. 序列化器快照被写入保存点以及已经序列化的状态字节(带有模式A)。

恢复的执行使用具有模式B的新状态序列化器重新访问恢复的状态字节

  1. 恢复前一个状态序列化器的快照。
  2. 状态字节在恢复时不反序列化,仅加载回状态后端(因此,仍然在模式A中)。
  3. 接收到新的序列化器后,通过类型序列化器 #resolveSchemaCompatibility 将其提供给恢复的前一个序列化器的快照,以检查模式兼容性。

将后端状态字节从模式A迁移到模式B

  1. 如果兼容性解决方案反映模式已经更改,并且可以进行迁移,则执行模式迁移。前一个识别模式A的状态序列化器将通过TypeSerializerSnapshot#restoreSerializer()从序列化器快照中获得,并用于将状态字节反序列化到对象,然后用新的序列化器重新写入对象,新的序列化器识别模式B来完成迁移。在继续处理之前,将所有访问状态的条目一起迁移。
  2. 如果分辨率显示不兼容,则状态访问失败,出现异常。

堆状态后端(例如 memorystateback、fsstateback )

向具有模式a的状态序列化器注册新状态

  1. 已注册的类型序列化器由状态后端维护。

取一个保存点,用模式a序列化所有状态

  1. 序列化器快照是通过 TypeSerializer#snapshotConfiguration 方法提取的。
  2. 序列化器快照被写入保存点。
  3. 状态对象现在序列化到保存点,以模式A编写。

在还原时,将状态反序列化为堆中的对象

  1. 恢复前一个状态序列化器的快照。
  2. 前面的序列化器可以识别模式A,它是通过 TypeSerializerSnapshot#restoreSerializer() 从序列化器快照中获得的,用于将状态字节反序列化到对象。
  3. 从现在开始,所有的状态都已经反序列化了。

恢复后的执行使用具有模式B的新状态序列化程序重新访问以前的状态

  1. 接收到新的序列化器后,通过类型序列化器#resolveSchemaCompatibility将其提供给恢复的前一个序列化器的快照,以检查模式兼容性。
  2. 如果兼容性检查表明需要迁移,那么在这种情况下什么也不会发生,因为对于堆后端,所有状态都已经反序列化为对象。
  3. 如果分辨率显示不兼容,则状态访问失败,出现异常。

采用另一个保存点,用模式B序列化所有状态

与步骤2相同。但是现在状态字节都在模式B中。

预定义的方便的TypeSerializerSnapshot类

Flink提供了两个抽象的基本

TypeSerializerSnapshot 

类,它们可以用于典型的场景:SimpleTypeSerializerSnapshot 和 CompositeTypeSerializerSnapshot。 

提供这些预定义快照作为其序列化器快照的序列化器必须始终具有自己的独立子类实现。这符合不跨不同序列化器共享快照类的最佳实践,下一节将更详细地解释这一点。

实现一个 SimpleTypeSerializerSnapshot

SimpleTypeSerializerSnapshot 用于没有任何状态或配置的序列化器,本质上意味着序列化器的序列化模式仅由序列化器的类定义。

当使用SimpleTypeSerializerSnapshot作为你的序列化器的快照类时,只有两种可能的兼容性解决结果:

  • TypeSerializerSchemaCompatibility.compatibleAsIs(),

    如果新的序列化器类保持相同,或者
  • TypeSerializerSchemaCompatibility.incompatible(),

    如果新的序列化器类与前一个不同。

下面是如何使用 SimpleTypeSerializerSnapshot 的一个例子,使用 Flink 的 IntSerializer 作为一个例子:

public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
    public IntSerializerSnapshot() {
        super(() -> IntSerializer.INSTANCE);
    }
}
           

IntSerializer 没有状态或配置。序列化格式仅由序列化器类本身定义,并且只能由另一个 IntSerializer 读取。因此,它适合 SimpleTypeSerializerSnapshot 的用例。

SimpleTypeSerializerSnapshot 的基础父类构造函数期望相应的序列化器的实例的提供者,不管快照是当前正在恢复还是在快照期间写入。

Supplier 

用于创建恢复序列化程序,以及用于验证新序列化程序是否属于预期的序列化程序类的类型检查。

实现一个 CompositeTypeSerializerSnapshot

CompositeTypeSerializerSnapshot 是为依赖于多个嵌套序列化器进行序列化的序列化器而设计的。

在进一步解释之前,我们将这个依赖于多个嵌套序列化器的序列化器称为上下文中的“外部”序列化器。例如 MapSerializer、ListSerializer、GenericArraySerializer 等等。例如,考虑 MapSerializer——键和值序列化器是嵌套的序列化器,而 MapSerializer 本身是“外部的”序列化器。

在这种情况下,外部序列化程序的快照还应该包含嵌套序列化程序的快照,以便可以独立地检查嵌套序列化程序的兼容性。在解决外部序列化器的兼容性时,需要考虑每个嵌套的序列化器的兼容性。

CompositeTypeSerializerSnapshot 被提供来帮助实现这些类型的组合序列化器的快照。它处理读取和写入嵌套的序列化器快照,并在考虑所有嵌套的序列化器兼容性的情况下解析最终的兼容性结果。

下面是如何使用 CompositeTypeSerializerSnapshot的一个例子,使用Flink的MapSerializer作为一个例子:

public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {

    private static final int CURRENT_VERSION = 1;

    public MapSerializerSnapshot() {
        super(MapSerializer.class);
    }

    public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
        super(mapSerializer);
    }

    @Override
    public int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
        TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
        return new MapSerializer<>(keySerializer, valueSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
    }
}
           

当实现一个新的序列化器快照作为 CompositeTypeSerializerSnapshot 的子类时,必须实现以下三个方法:

  • getCurrentOuterSnapshotVersion():

    此方法定义当前外部序列化程序快照的序列化二进制格式的版本。
  • getNestedSerializers(TypeSerializer):

    给定外部序列化器,返回其嵌套的序列化器。
  • createOuterSerializerWithNestedSerializers(TypeSerializer[]):

    给定嵌套的序列化器,创建外部序列化器的实例。

上面的例子是一个 CompositeTypeSerializerSnapshot,其中除了嵌套的序列化器快照之外,没有额外的信息可以快照。因此,它的外部快照版本永远不需要增加。然而,其他一些序列化器包含一些额外的静态配置,需要与嵌套组件序列化器一起进行持久化。这方面的一个例子是 Flink 的 GenericArraySerializer,它作为配置包含数组元素类型的类,以及嵌套的元素序列化器。

在这些情况下,需要在 CompositeTypeSerializerSnapshot 上实现另外三个方法:

  • writeOuterSnapshot(DataOutputView):

    定义如何写入外部快照信息。
  • readOuterSnapshot(int, DataInputView, ClassLoader):

    定义如何读取外部快照信息。
  • isOuterSnapshotCompatible(TypeSerializer):

    检查外部快照信息是否保持相同。

默认情况下,CompositeTypeSerializerSnapshot 假定没有任何需要读写的外部快照信息,因此上面方法的默认实现是空的。如果子类具有外部快照信息,则必须实现所有三个方法。

下面是一个例子,使用 Flink 的 GenericArraySerializer 作为例子,说明 CompositeTypeSerializerSnapshot 是如何被用于具有外部快照信息的复合序列化器快照的:

public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {

    private static final int CURRENT_VERSION = 1;

    private Class<C> componentClass;

    public GenericArraySerializerSnapshot() {
        super(GenericArraySerializer.class);
    }

    public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
        super(genericArraySerializer);
        this.componentClass = genericArraySerializer.getComponentClass();
    }

    @Override
    protected int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected void writeOuterSnapshot(DataOutputView out) throws IOException {
        out.writeUTF(componentClass.getName());
    }

    @Override
    protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
    }

    @Override
    protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
        return this.componentClass == newSerializer.getComponentClass();
    }

    @Override
    protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
        return new GenericArraySerializer<>(componentClass, componentSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
    }
}
           

在上面的代码片段中有两件重要的事情需要注意。首先,由于这个 CompositeTypeSerializerSnapshot 实现具有作为快照的一部分写入的外部快照信息,所以当外部快照信息的序列化格式改变时,由 getCurrentOuterSnapshotVersion() 定义的外部快照版本必须被选中。

其次,注意我们如何在编写组件类时避免使用 Java 序列化,只编写类名并在读取快照时动态加载它。避免使用 Java 序列化来编写序列化器快照的内容通常是一个很好的实践。下一节将对此进行详细介绍。

实施说明和最佳实践

1、通过用类名实例化 Flink 恢复序列化程序快照

序列化程序的快照是已注册状态如何序列化的唯一真实来源,它作为保存点中读取状态的入口点。为了能够恢复和访问以前的状态,必须能够恢复以前的状态序列化程序的快照。

Flink 首先通过实例化 TypeSerializerSnapshot 及其类名(与快照字节一起写入)来恢复序列化器快照。因此,为了避免意外的类名更改或实例化失败,TypeSerializerSnapshot 类应该:

  • 避免实现为匿名类或嵌套类,
  • 具有用于实例化的公共空构造函数

2、避免在不同的序列化器之间共享相同的 typeseriizersnapshot 类

由于模式兼容性检查将通过序列化器快照进行,因此让多个序列化器返回相同的

TypeSerializerSnapshot 

类作为它们的快照将使

TypeSerializerSnapshot#resolveSchemaCompatibility 

TypeSerializerSnapshot#restoreSerializer() 

方法的实现变得复杂。

这也是一个糟糕的关注点分离;单个序列化程序的序列化模式、配置以及如何恢复它,应该整合到其专用的 TypeSerializerSnapshot 类中。

3、避免对序列化器快照内容使用Java序列化

在写入持久序列化器快照的内容时,根本不应该使用 Java 序列化。例如,一个序列化程序需要将其目标类型的一个类作为其快照的一部分。关于类的信息应该通过编写类名来持久化,而不是使用Java直接序列化类。读取快照时,将读取类名,并使用该类名动态加载类。

这种做法可以确保序列化器快照总是可以安全地读取。在上面的例子中,如果类型类是使用 Java 序列化持久化的,那么一旦类实现改变,快照就不再是可读的了,而且根据Java序列化的细节,也不再是二进制兼容的。

在 Flink 1.7 之前从废弃的序列化器快照APIs迁移

本节介绍如何从序列化器和在 Flink 1.7之前存在的序列化器快照迁移 API。

在 Flink 1.7之前,序列化器快照是作为 TypeSerializerConfigSnapshot 实现的(现在已经不推荐了,将来会被新的 TypeSerializerSnapshot 接口完全取代)。而且,序列化器模式兼容性检查的职责存在于

TypeSerializer 

中,在

TypeSerializer

#ensureCompatibility(typeseriizerconfigsnapshot) 方法中实现。

新旧抽象之间的另一个主要区别是,废弃的 TypeSerializerConfigSnapshot 没有实例化以前的序列化器的能力。因此,如果序列化器仍然返回 TypeSerializerConfigSnapshot 的子类作为其快照,则序列化器实例本身将始终使用 Java 序列化写入保存点,以便在恢复时可以使用前面的序列化器。这是非常不可取的,因为恢复作业是否成功容易受到前一个序列化器类的可用性的影响,或者通常来说,序列化器实例是否可以在恢复时使用 Java 序列化读取回来。这意味着您的状态只能使用相同的序列化器,而且一旦您想升级序列化器类或执行模式迁移,就会出现问题。

为了保证将来不受影响,并且具有迁移状态序列化器和模式的灵活性,强烈建议从旧的抽象进行迁移。具体操作步骤如下:

  1. 实现 TypeSerializerSnapshot 的一个新子类。这将是您的序列化程序的新快照。
  2. 在 TypeSerializer#snapshotConfiguration() 方法中,将新的 typeseriizersnapshot 作为您的序列化器的序列化器快照返回。
  3. 从存在于 Flink 1.7之前的保存点恢复作业,然后再次获取保存点。注意,在这一步中,序列化器的旧 TypeSerializerConfigSnapshot 必须仍然存在于类路径中,并且不能删除 TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) 方法的实现。这个过程的目的是用新实现的 serializer 的

    TypeSerializerSnapshot 

    替换在旧保存点中写入的

    TypeSerializerConfigSnapshot

  4. 一旦使用 Flink 1.7获取了保存点,保存点将包含 TypeSerializerSnapshot 作为状态序列化器快照,并且序列化器实例将不再写入保存点。现在,删除旧抽象的所有实现都是安全的(移除旧的

    TypeSerializerConfigSnapshot 实现就像序列化中的 TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) 一样

    )。