天天看点

22.DataStream API之State & Fault Tolerance(Queryable State)

flink 1.9

Queryable State Beta

注意:用于可查询状态的客户端client APIs目前处于不断发展的状态,不能保证所提供的接口的稳定性。在即将到来的Flink版本中,很可能会更改客户端API。

简而言之,此函数将Flink的managed keyed(分区)状态(请参阅Working with State)暴露给外部,并允许用户从Flink外部查询作业的状态。对于某些情况,可查询状态消除了对外部系统(例如key-value存储)的分布式操作算子/事务的需要,这通常是实践中的瓶颈。此外,此函数对于调试可能特别有用。

注意:在查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加作业的延迟,我们希望避免这种情况。因为任何状态后端都使用Java heap space堆空间,例如MemoryStateBackend或FsStateBackend,在检索值时不能与副本一起使用,而是直接引用存储的值,读-修改-写(read-modify-write)模式是不安全,并且可能导致可查询状态服务器由于并发修改而失败。RocksDBStateBackend后端不会出现这些问题。

Architecture

在展示如何使用可查询状态之前,有必要简要描述组成该状态的实体。可查询状态特性由三个主要实体组成:

  1. QueryableStateClient(可能)运行在Flink集群之外并提交用户查询,
  2. QueryableStateClientProxy运行在每个TaskManager上(即在Flink集群中),负责接收客户端的查询,代表客户机从负责的任务管理器获取请求的状态,并将其返回给客户端,
  3. QueryableStateServer运行在每个TaskManager上,负责为本地存储的状态提供服务。

客户端连接到其中一个代理,并发送与特定Keys关联的状态的请求k。如“Working with State”中描述,keyed state按Key Groups进行分配 ,每个TaskManager都分配了许多这些key groups组。为了找出哪个TaskManager负责Keys groups组持有k,代理将询问JobManager。根据答案,代理将查询运行在TaskManager上的QueryableStateServer,以获得与k关联的状态,并将响应转发回客户端。

Activating Queryable State

要在Flink集群上启用可查询状态,需要执行以下操作:

  1. 将Flink -queryable-state-runtime_2.11-1.9.0.jar从Flink发行版(Flink distribution)的opt/文件夹复制到lib/文件夹。
  2. 设置属性queryable-state.enable为true。有关详细信息和其他参数,请参阅配置文档Configuration。

要验证集群是否在启用可查询状态的情况下运行,请检查任意一个TaskManager的日志:"Started the Queryable State Proxy Server @ ...".

Making State Queryable

现在您已经激活了集群上的可查询状态,现在是时候看看如何使用它了。为了使一个状态对外界可见,需要显式地使用以下方法使其可查询:

  1. QueryableStateStream是一个方便的对象,它充当一个接收器,并提供其传入值作为可查询状态,或者
  2. 该stateDescriptor.setQueryable(String queryableStateName) 方法使得由状态描述符(state descriptor)表示的keyed state状态可查询。

下面几节解释这两种方法的使用。

Queryable State Stream

在KeyedStream上调用. asqueryablestate (stateName, stateDescriptor)将返回一个QueryableStateStream,它将其值提供为可查询状态。根据状态的类型,asQueryableState()方法有以下几种变体:

// ValueState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ValueStateDescriptor stateDescriptor)

// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)

// FoldingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    FoldingStateDescriptor stateDescriptor)

// ReducingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor stateDescriptor)
           

注意:没有可查询的ListState sink接收器,因为它会导致一个不断增长的列表,而这个列表可能不会被清除,因此最终会消耗太多内存。

返回的QueryableStateStream可以看作是一个sink接收器,不能进一步转换。在内部,QueryableStateStream被转换为一个操作算子,该操作算子使用所有传入的记录来更新可查询状态实例。更新逻辑由asQueryableState调用中提供的状态描述符类型进行暗示。在下面的程序中,键控流(keyed stream)的所有记录都将通过ValueState.update(value)更新状态实例

stream.keyBy(0).asQueryableState("query-name")
           

 这类似于Scala API的flatMapWithState。

Managed Keyed State

通过使相应的状态描述符进行查询,可以使操作算子的Managed keyed state状态(请参阅Using Managed Keyed State)StateDescriptor.setQueryable(String queryableStateName),如下面的例子所示:

ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name
           

注意:queryableStateName参数可以任意选择,只用于查询。它不必与状态自己的名字相同。

对于哪种状态类型可以查询,此变体没有限制。这意味着它可以用于任何状态类型,例如:ValueState、ReduceState、ListState、MapState、AggregatingState和当前不推荐的FoldingState。

Querying State

到目前为止,您已经将集群设置为使用可查询状态运行,并且已经将(一些)状态声明为可查询状态。现在是查看如何查询此状态的时候了。

为此,您可以使用QueryableStateClient helper类。这在flink-queryable-state-client jar中是可用的,它必须与flink-core一起作为依赖项显式地包含在项目的pom.xml中,如下所示: 

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.9.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-queryable-state-client-java</artifactId>
  <version>1.9.0</version>
</dependency>
           

有关更多信息,您可以查看如何设置Flink程序set up a Flink program。

QueryableStateClient将您的查询提交给内部代理,然后该代理将处理您的查询并返回最终结果。初始化客户端的唯一要求是提供一个有效的TaskManager主机名(请记住,每个TaskManager上都运行着一个可查询的状态代理)和代理侦听的端口。有关如何配置代理和状态服务器端口的更多信息,请参见配置部分 Configuration Section。

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
           

 在客户端就绪后,要查询V类型的状态,与K类型的键相关联,可以使用以下方法:

CompletableFuture<S> getKvState(
    JobID jobId,
    String queryableStateName,
    K key,
    TypeInformation<K> keyTypeInfo,
    StateDescriptor<S, V> stateDescriptor)
           

上面返回一个CompletableFuture ,最终包含可查询状态实例的状态值,该实例由JobID作业的queryableStateName标识。如果该key是您感兴趣的状态的key ,那么keyTypeInfo将告诉Flink如何序列化/反序列化这个key。最后,状态描述符包含关于请求状态的必要信息,即它的类型(值、Reduce等)和关于如何序列化/反序列化它的必要信息。

细心的读者会注意到返回的future包含一个类型S的值,即一个包含实际值的State状态对象。这可以是Flink支持的任何状态类型:ValueState、ReduceState、ListState、MapState、AggregatingState和当前不推荐的FoldingState。

注意:这些状态对象不允许修改所包含的状态。您可以使用它们来获取状态的实际值,例如使用valueState.get(),或者迭代包含的<K, V>元素,例如使用mapState.entries(),但是您不能修改它们。例如,在返回的列表状态上调用add()方法将引发UnsupportedOperationException。

注意:客户端是异步的,可以由多个线程共享。在未使用时,需要通过QueryableStateClient.shutdown()关闭它,以便释放资源。

Example

下面的示例扩展了CountWindowAverage示例(参见Using Managed Keyed State),使其可查询,并展示了如何查询这个值:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}
一旦在作业中使用,您可以检索作业ID,然后从该操作算子查询任何键key的当前状态:
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
          "average",
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
        client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

// now handle the returned value
resultFuture.thenAccept(response -> {
        try {
            Tuple2<Long, Long> res = response.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
});
           

以下配置参数影响可查询状态服务器和客户端的行为。它们在QueryableStateOptions中定义。

State Server

  • queryable-state.server.ports:可查询状态服务器的服务器端口范围。如果同一台机器上运行多个任务管理器,这对于避免端口冲突非常有用。指定的范围可以是:一个端口:“9123”,一个端口范围:“50100-50200”,或者一个范围和点列表:“50100-50200,50300-50400,51234”。默认端口是9067。
  • queryable-state.server.network-threads: 接收状态服务器传入请求的网络(事件循环)线程数 (0 => #slots)
  • queryable-state.server.query-threads: 处理/服务状态服务器传入请求的线程数(0 => #slots)

Proxy

  • queryable-state.proxy.ports:可查询状态代理的服务器端口范围。如果同一台机器上运行多个任务管理器,这对于避免端口冲突非常有用。指定的范围可以是:一个端口:“9123”,一个端口范围:“50100-50200”,或者一个范围和点列表:“50100-50200,50300-50400,51234”。默认端口是9069。
  • queryable-state.proxy.network-threads:接收客户端代理传入请求的网络(事件循环)线程数(0 => #slots)
  • queryable-state.proxy.query-threads:为客户端代理处理/服务传入请求的线程数 (0 => #slots)

Limitations

  • 可查询状态生命周期绑定到作业的生命周期,例如,任务在启动时注册可查询状态,在处理时注销可查询状态。在未来的版本中,最好将其解耦,以便允许在任务完成后进行查询,并通过状态复制加速恢复。
  • 关于可用KvState的通知通过一个简单的告诉发生在未来,应该通过询问和确认来改进这一点。
  • 服务器和客户端跟踪查询的统计信息。由于它们不会在任何地方公开,因此默认情况下它们是禁用的。一旦有更好的支持通过Metrics系统发布这些数字,我们就应该启用统计数据。

总结:

Queryable State: (flink 1.2引入的)

flink的状态管理对外部是可访问的,在查询状态对象时,为了减少延迟,并发线程访问该对象时,不会进行任何同步或复制操作。由于任何使用Java堆空间的状态后端,例如memorystateback或fsstateback,在检索值时都不使用副本,而是直接引用存储值,因此read-modify-write模式是不安全的,并且可能导致可查询的状态服务器由于并发修改而失败。RocksDBStateBackend 端不会出现这些问题。

可查询状态特性由三个主要实体组成:

(1)QueryableStateClient:用于用户查询的客户端;

(2)QueryableStateClientProxy:负责接收客户端查询,代表客户端从相应的Task Manager获取请求状态,并将其返回给客户端;

(3)QueryableStateServer :它运行在每个Task Manager上,负责为本地存储的状态提供服务。

22.DataStream API之State &amp; Fault Tolerance(Queryable State)

https://www.jianshu.com/p/ecb4eea8ef60

https://www.jianshu.com/p/7fe9bee59ceb

Custom Serialization for Managed State:为managed state自定义序列化类

接口TypeSerializerSnapshot :flink 1.7引入的,1.7之前是TypeSerializerConfigSnapshot

public interface TypeSerializerSnapshot<T> {
    int getCurrentVersion();

    void writeSnapshot(DataOutputView var1) throws IOException;

    void readSnapshot(int var1, DataInputView var2, ClassLoader var3) throws IOException;

    TypeSerializer<T> restoreSerializer();

    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> var1);

    static void writeVersionedSnapshot(DataOutputView out, TypeSerializerSnapshot<?> snapshot) throws IOException {
        out.writeUTF(snapshot.getClass().getName());
        out.writeInt(snapshot.getCurrentVersion());
        snapshot.writeSnapshot(out);
    }

    static <T> TypeSerializerSnapshot<T> readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
        TypeSerializerSnapshot<T> snapshot = TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
        int version = in.readInt();
        snapshot.readSnapshot(version, in, cl);
        return snapshot;
    }
}
           

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

https://flink.sojb.cn/dev/stream/state/queryable_state.html

继续阅读