天天看点

19.DataStream API之State & Fault Tolerance(Working with State)

flink 1.9

State & Fault Tolerance

本文档解释了如何在开发应用程序时使用Flink的状态抽象。

Keyed State and Operator State

Flink中有两种基本状态:key值状态Keyed State和操作算子状态Operator State。

Keyed State

Keyed State总是与key相关,并且只能应用于KeyedStream的函数和操作算子中。

您可以将Keyed State视为已分区或分片的操作算子状态,每个key值对应一个状态分区。每个keyed-state在逻辑与<parallel-operator-instance, key>绑定在一起,由于每个key只“属于”keyed operator操作算子的一个并行实例,所以我们可以简单地将其看作<operator, key>。

Keyed State可以进一步的组成Key Group, Key Group是Flink重新分配Keyed State的最小单元,这里有跟定义的最大并行数一样多的Key Group,在运行时keyed operator的并行实例与key一起为一个或者多个Key Group工作。

Operator State

使用Operator State状态(或non-keyed state状态),每个操作算子状态Operator State都绑定到一个并行操作算子实例。Kafka Connector就是在Flink中使用Operator State的一个很好的例子。每个Kafka consumer的并行实例保存着一个topic分区和offsets偏移量的map(topic-partition,offsets)作为它的Operator State。

当并行度发生变化时,Operator State接口支持在并行操作实例中进行重新分配状态,这里有多种方法来进行重分配。

总结:flink的状态分为两种:keyed state和operator state

(1)Keyed state:每个键key状态在逻辑上是唯一组合的,并以<parallel-operator-instance, key>的形式进行绑定,因此状态存储形式是:<operator, key>---》<操作算子线程实例,key值>

(2)Operator state:每个操作算子状态都绑定到一个并行运算符实例上,以kafka connector为例,状态存储形式:<topic partition,offset>

Raw and Managed State

Keyed State和 Operator State存在两种形式:托管managed 和原生raw。

托管状态Managed State由Flink运行时控制的数据结构表示,如内部哈希表或RocksDB。例如“ValueState”、“ListState”等。Flink运行时对状态进行编码并将它们写入检查点checkpoints。(状态存储的数据结构由flink程序控制)

原始状态Raw State是操作算子保存它们自己的数据结构中的状态。当进行checkpoints时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。(状态存储的数据结构由用户控制)

所有datastream函数都可以使用托管状态managed state,但是原始状态raw state接口只能在实现操作算子时使用。建议使用托管状态managed state(而不是原始状态raw state),因为使用托管状态(managed state),Flink可以在并行度发生更改时自动重新分配状态,而且还可以进行更好的内存管理。

Attention: 如果您的托管状态managed state需要自定义序列化逻辑,请参阅相应的指南,以确保将来的兼容性。Flink的默认序列化器不需要特殊处理。

Using Managed Keyed State

托管的键控state(managed keyed state)接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。

现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原语如下:

  • ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用update(T)来更新,使用T value()来获取。
  • ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个保存当前所有元素的Iterable,可以通过调用add(T)或addAll(List<T>)来添加元素,通过调用Iterable<T> get()来获取元素,还可以通过调用update(List<T>)来更新元素。
  • ReducingState<T>: 这将保留一个值,该值表示添加到状态的所有值的聚合。该接口类似于ListState,这种状态通过用户传入的reduceFunction,每次调用add(T)方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
  • AggregatingState<IN, OUT>:这将保留一个值,该值表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态中的元素类型不同(例如:输入的是String类型,聚合输出是Integer类型)。接口与ListState相同,但是使用add(IN)添加元素,使用AggregateFunction将这些添加的元素聚合成一个值。
19.DataStream API之State &amp; Fault Tolerance(Working with State)
  • FoldingState<T, ACC>: 他保留一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态中的元素类型不同。该接口类似于ListState,但是使用add(T)添加元素,使用FoldFunction将这些添加的元素聚合成一个值。(注意:这种状态将会在Flink未来版本(flink 1.4)中被删除,未来会使用AggregatingState<IN, OUT>替代)
  • MapState<UK, UV>: 这个保存了一个映射mappings列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射mappings可以使用put(UK, UV)或者putAll(Map<UK, UV>)来添加。与key相关的value,可以使用get(UK)来获取,映射mappings的迭代iterable 、keys以及values可以分别调用entries(), keys()和values()来获取。

所有类型的state都有一个clear()方法来清除当前活动的key(及输入元素的key)的State。

注意FoldingState和FoldingStateDescriptor在Flink 1.4中已经被弃用,并在将来的版本中彻底删除。请使用AggregatingState和AggregatingStateDescriptor。

19.DataStream API之State &amp; Fault Tolerance(Working with State)

值得注意的是这些State对象仅用于与State接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么再一次调用用户函数中获得的值可能与另一次调用的值不同。

为了获得一个State句柄,你需要创建一个StateDescriptor,这个StateDescriptor保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的名称以便你能够引用它们),State保存的值的类型,可能还有一个指定的函数,例如:ReduceFunction。根据你想要检索的state的类型,你可以创建ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor或MapStateDescriptor。

State可以通过RuntimeContext来访问,所以只能在rich functions中使用。有关这方面的信息,请参阅这里here,接下来我们将也会看到一个示例。在RichFunction中的RuntimeContext有以下可以访问状态state的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这个FlatMapFunction例子展示了所有部件如何组合在一起:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
           

 这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key值都是1),这个函数将计数和运行时的总和保存在一个ValueState中,一旦计数大于2,它就会发出平均值并清除状态state,这样我们就又可以从0开始。请注意,如果我们在第一个字段中具有不同值的元组,那么这将为每个不同输入的key值保存不同的状态state值。

State Time-To-Live (TTL) ---> flink 1.6才引入的功能

生存时间(TTL)可以分配给任何类型的keyed state。如果配置了TTL,并且状态值已经过期,那么将尽最大努力清理存储值,下面将对此进行更详细的讨论。

每个状态都有TTL, 所有状态集合类型都支持给每个元素配置TTLs。这意味着list元素和map记录可以独立设置过期时间TTL。

为了使用状态TTL,必须首先构建一个StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
           

配置有几个选项需要考虑:

newBuilder的第一个参数是必选的,它表示状态的剩余生存时间TTL。

当状态的TTL刷新时,配置更新类型(默认是:OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅有创建和写入是访问
  • StateTtlConfig.UpdateType.OnReadAndWrite - 仅有读取时访问

状态可见性配置如果未清除过期值,则在读取访问时是否返回过期值(默认情况下,NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期的值(备注:TTL结束,即过期了的状态state就会被清除)
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回(备注:TTL结束,即过期了的状态state不会立刻被清除,只有在调用ValueState.clear()后才会被清除)

在NeverReturnExpired模式下,过期状态表现的就好像它不再存在一样,即使它仍然必须被删除。该选项对于在TTL之后,必须严格限制数据再次被读取访问的场景非常有用,例如:对应处理隐私敏感数据的应用程序。

另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。

Notes:

  • 状态后端state backends存储了最后一次修改的时间戳和用户值,这意味着启用此功能会增加状态存储的消耗。对于堆状态后端 (Heap state backend)存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始long值。RocksDB状态后端为每个存储值value,列表list项或映射map项添加8个字节。
  • 目前在processing event模式下才支持状态的 TTL 。
  • 以前在没有状态TTL配置的模式下,如果试图恢复状态,通常使用TTL的描述符来实现,如果不使用描述符恢复状态,则将导致兼容性失败和statmigrationexception异常。。
  • TTL配置不是检查点checkpoint或保存点savepoints的一部分,而是Flink在当前运行的作业job中如何处理它的一种方式。
  • 只有当用户值序列化器能够处理空值时,当前具有TTL模式的map状态才支持空用户值。如果序列化器不支持null值,可以使用NullableSerializer对其进行包装,代价是在序列化形式中增加一个字节。

Cleanup of Expired State

默认情况下,过期值只有在显式读出时才会被删除,例如通过调用ValueState.value()。

注意:这意味着默认情况下,如果未读取过期状态,则不会删除它,这可能导致状态不断增大。这可能会在将来的版本中发生变化。

Cleanup in full snapshot清除整个快照

此外,您可以在获取完整状态快照时激活清理操作,这将减少状态的大小。当前实现不会清理本地状态,但从上一个快照snapshot恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig中配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();
           

 此选项不适用于RocksDB状态后端中的增量检查点。

以后还会添加更多的策略在后台自动清理过期状态。

作用:

可以规定状态state(ValueState<T>、ListState<T>、MapState<UK, UV>...)的生存时间,防止state一直被checkpoint,导致checkpoint越来越大,最终程序崩溃。

样例demo:

state的TTL时间:3s

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSource  extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws InterruptedException {

      Tuple3[] elements = new Tuple3[]{
         Tuple3.of("a", "1", 1551169050000L),
         Tuple3.of("aa", "33", 1551169064000L),
         Tuple3.of("a", "2", 1551169054000L),
         Tuple3.of("a", "3", 1551169064000L),
         Tuple3.of("b", "5", 1551169100000L),
         Tuple3.of("a", "4", 1551169079000L),
         Tuple3.of("aa", "44", 1551169079000L),
         Tuple3.of("b", "6", 1551169108000L)
      };

      int count = 0;
      while (running && count < elements.length) {
         ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (Long) elements[count].f2));
         count++;
         Thread.sleep(1000);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           
package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class StateMangerTTL {
   public static void main(String[] args) throws Exception {

      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple3<String, String, Long>> source = env.addSource(new StreamDataSource()).setParallelism(1).name("Demo Source");

      // 设置水位线
      DataStream<Tuple3<String, String, Long>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      stream.keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple3<String, String, Long>, Object>() {
         Long lastTime = 0L;
         ValueState<String> valueState;

         @Override
         public void open(Configuration parameters) throws Exception {
            System.out.println("open-------------:");
//状态的TTL时间:3s
            StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.seconds(3))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               //这个模式,TTL结束状态就会被清除,如果是ReturnExpiredIfNotCleanedUp
               //只有显示调用ValueState.value()后才会被清除
               .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

            ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
            stateDescriptor.enableTimeToLive(ttlConfig);
            valueState = getRuntimeContext().getState(stateDescriptor);
         }

         @Override
         public void processElement(Tuple3<String, String, Long> value, Context context, Collector<Object> collector) throws Exception {
            System.out.println("processElement---------:" + value + " context.timestamp: " + context.timestamp() + " class: " + this.hashCode());
            valueState.update(value.f1);
            Thread.sleep(2000);
            System.out.println("------" + value.f1 + "--------: " + valueState.value());
            Thread.sleep(2000);
            System.out.println("------" + value.f1 + "--------: " + valueState.value());
            Long lastTimestamp = context.timestamp();
            //设置定时器30分钟,30分钟后未更新,则回调onTimer
            lastTime = lastTimestamp;
            context.timerService().registerEventTimeTimer(lastTimestamp + 20000);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            System.out.println("onTimer-------:" + timestamp + " OnTimerContext--key: " + ctx.getCurrentKey() + "---" + ctx.timeDomain());
            if (lastTime + 20000 == timestamp) {
               System.out.println("---onTimer--:" + timestamp + " ctx.key " + ctx.getCurrentKey());
            }
         }

         @Override
         public void close() throws Exception {
            System.out.println("close--------:");
         }
      }).print();
      env.execute("TimeWindowDemo");
   }
}
           
open-------------:
open-------------:
processElement---------:(a,1,1551169050000) context.timestamp: 1551169050000 class: 67103355
------1--------: 1
processElement---------:(b,5,1551169100000) context.timestamp: 1551169100000 class: 133908999
------1--------: null
processElement---------:(aa,33,1551169064000) context.timestamp: 1551169064000 class: 67103355
------5--------: 5
------33--------: 33
------5--------: null
processElement---------:(b,6,1551169108000) context.timestamp: 1551169108000 class: 133908999
------33--------: null
processElement---------:(a,2,1551169054000) context.timestamp: 1551169054000 class: 67103355
------6--------: 6
------2--------: 2
------6--------: null
onTimer-------:1551169120000 OnTimerContext--key: (b)---EVENT_TIME
onTimer-------:1551169128000 OnTimerContext--key: (b)---EVENT_TIME
---onTimer--:1551169128000 ctx.key (b)
close--------:
------2--------: null
processElement---------:(a,3,1551169064000) context.timestamp: 1551169064000 class: 67103355
------3--------: 3
------3--------: null
onTimer-------:1551169070000 OnTimerContext--key: (a)---EVENT_TIME
onTimer-------:1551169074000 OnTimerContext--key: (a)---EVENT_TIME
onTimer-------:1551169084000 OnTimerContext--key: (a)---EVENT_TIME
---onTimer--:1551169084000 ctx.key (a)
onTimer-------:1551169084000 OnTimerContext--key: (aa)---EVENT_TIME
---onTimer--:1551169084000 ctx.key (aa)
processElement---------:(a,4,1551169079000) context.timestamp: 1551169079000 class: 67103355
------4--------: 4
------4--------: null
processElement---------:(aa,44,1551169079000) context.timestamp: 1551169079000 class: 67103355
------44--------: 44
------44--------: null
onTimer-------:1551169099000 OnTimerContext--key: (a)---EVENT_TIME
---onTimer--:1551169099000 ctx.key (a)
onTimer-------:1551169099000 OnTimerContext--key: (aa)---EVENT_TIME
---onTimer--:1551169099000 ctx.key (aa)
close--------:
           

Cleanup in background

除了对整个快照进行清理外,还可以在后台激活清理操作。下面的选项将激活StateTtlConfig中的默认后台清理快照状态,如果后台支持该选项:

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInBackground()
    .build();
           

 要对后台的一些特殊清理进行更细粒度的控制,可以按照下面的描述分别配置它。目前,堆状态后端(heap state backend)主要是增量清理,而RocksDB后端使用压缩过滤器在后台进行清理。

Incremental cleanup

另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略对于某个状态是激活的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。

这个功能可以在StateTtlConfig中激活:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
           

这种策略有两个参数。第一个是每次清理触发的检查状态条目数。如果启用,则每次状态访问都将触发它。第二个参数定义是否为每个记录处理额外触发清理。如果启用默认的后台清理,则此策略将在堆后端(heap backend)激活,其中包含5个选中的条目,且每个记录处理不需要清理。

Notes:

  • 如果没有对状态进行访问或没有处理任何记录,则过期状态将持久存在。
  • 用于增量清理的时间增加了记录处理的延迟。
  • 目前,只有堆状态后端(heap state backend)实现了状态的增量清理。为RocksDB设置状态的增量清理是不起作用的。
  • 如果使用堆状态后端(heap state backend)进行同步快照(synchronous snapshotting),全局迭代器在迭代时保留所有keys的副本,因为它的特定实现不支持并发修改。启用此功能将增加内存消耗。异步快照没有这个问题。
  • 对于现有作业job,此清理策略(增量清理)可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后。

Cleanup during RocksDB compaction在RocksDB压缩期间进行清理

如果使用RocksDB状态后端,另一个清理策略是激活Flink特定的压缩过滤器。RocksDB定期运行异步压缩来合并状态更新和减少存储。Flink压缩过滤器使用TTL检查状态项的过期时间戳,并排除过期值。

默认情况下禁用此功能。必须首先在RocksDB后端激活它,通过设置Flink配置选项state.backend. RocksDB .ttl.compaction.filter.enabled,或者在为作业job创建自定义RocksDB状态后端时,调用RocksDBStateBackend::enableTtlCompactionFilter。然后任何带有TTL的状态都可以为其配置和使用过滤器:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();
           

RocksDB压缩过滤器将在每次处理一定数量的状态项之后,从Flink查询用于检查过期的当前时间戳。您可以更改它,并将自定义值传递给StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。更频繁地更新时间戳可以提高清理速度,但会降低压缩性能,因为它使用来自本地代码的JNI调用。如果启用默认的后台清理,那么RocksDB后端将激活此策略,每次处理1000个条目时将查询当前时间戳。

通过激活FlinkCompactionFilter的调试级别,您可以从RocksDB过滤器的本地代码中激活调试日志:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
           

Notes:

  • 在压缩过程中调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的key的每个存储状态条目的过期时间。对于集合状态类型(list or map),每个存储的元素也调用TTL过滤器进行检查。
  • 如果该特性与包含非固定字节长度元素的列表状态一起使用,原生TTL过滤器还必须为每个状态条目额外调用一个Flink java类型序列化器,该序列化器是元素在JNI上的序列化器,其中只有第一个元素已过期,才能确定下一个未过期元素的偏移量。
  • 对于现有作业job,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点savepoint重新启动。

State in the Scala DataStream API

除了上面描述的接口之外,Scala API还为KeyedStream上具有单个ValueState的有状态的map()或flatMap()函数提供了快捷方式。user函数在一个选项中获取ValueState的当前值,并且必须返回一个更新后的值,该值将用于更新状态。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })
           

Using Managed Operator State

要使用托管操作符算子状态,有状态函数可以实现更通用的CheckpointedFunction接口,也可以实现listcheckpoint <T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction接口通过不同的重新分配方案提供对non-keyed 状态的访问。它需要实现两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
           

无论何时必须执行检查点,都会调用snapshotState()。每当初始化用户定义的函数时,都会调用对应的initializeState(),无论是在函数第一次初始化时,还是在函数从早期检查点实际恢复时。因此,initializeState()不仅是初始化不同类型状态的地方,而且是包含状态恢复逻辑的地方。

目前,支持list样式的托管操作算子状态。状态应该是一个可序列化对象的List,彼此独立,因此在重新调整时可以重新分配。换句话说,这些对象是可以重新分布non-keyed状态的最细粒度。根据状态访问方法,定义了以下重分发方案:

  • Even-split redistribution:每个操作算子都会返回一个状态state元素列表list。整个状态在逻辑上是所有列表lists的串联。在恢复/重新分发时,该列表list被平均地分成尽可能多的并行操作算子子列表sublists 。每个操作算子获取一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,list<list1, list2, ... ,listn> ---> list<list1[e1,e2], list2[],list3[e1,e2,e3], list4[e1]>,如果并行度为1(parallelism =1),则操作算子的检查点checkpointed 状态包含元素list1,list2,list3,list4,当将并行度增加到2时,list1,list2可能会出现在操作算子实例0中,而list3,list4则会出现在操作算子实例1中。
  • Union redistribution:每个操作算子都会返回一个状态state元素列表list。整个状态在逻辑上是所有列表的串联。在恢复/重新分发时,每个操作算子都会获得状态元素的完整列表。例如,list<list1, list2, ... ,listn> ---> list<list1[e1,e2], list2[],list3[e1,e2,e3], list4[e1]>,如果并行度为1,则操作算子的检查点状态包含元素list1,list2,list3,list4,当将并行度增加到2时,list1,list2,list3,list4会出现在操作算子实例0中,而list1,list2,list3,list4也会出现在操作算子实例1中。

下面是一个有状态SinkFunction的例子,它使用CheckpointedFunction在将元素发送到外部之前缓冲它们。它演示了even-split redistribution模式的列表状态:

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class BufferingSink implements SinkFunction<Tuple3<String, String, Long>>,
   CheckpointedFunction {

   private final int threshold;

   private transient ListState<Tuple3<String, String, Long>> checkpointedState;

   private List<Tuple3<String, String, Long>> bufferedElements;

   public BufferingSink(int threshold) {
      this.threshold = threshold;
      this.bufferedElements = new ArrayList<>();
   }

   @Override
   public void invoke(Tuple3<String, String, Long> value) throws Exception {
      bufferedElements.add(value);
      if (bufferedElements.size() == threshold) {
         for (Tuple3<String, String, Long> element: bufferedElements) {
            // send it to the sink
         }
         bufferedElements.clear();
      }
   }

   /**
    * 每次进行checkpoint前,该方法会被调用,因此需要将list缓存的数据保存到listState状态中
    * @param context the context for drawing a snapshot of the operator
    * @throws Exception
    */
   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
      checkpointedState.clear();//目的是清除前一个checkpoint的状态数据,保存新的下一个checkpoint的数据
      for (Tuple3<String, String, Long> element : bufferedElements) {
         checkpointedState.add(element);
      }
   }

   /**
    * 程序重启时或者第一次初始化用户对象方法时,该方法被调用,主要用于程序重启时,从checkpoint状态中恢复数据。
    * @param context the context for initializing the operator
    * @throws Exception
    */
   @Override
   public void initializeState(FunctionInitializationContext context) throws Exception {
      ListStateDescriptor<Tuple3<String, String, Long>> descriptor =
         new ListStateDescriptor<>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple3<String, String, Long>>() {}));
      //状态分发策略:Even-split redistribution
      checkpointedState = context.getOperatorStateStore().getListState(descriptor);
       //状态分发策略:Union redistribution
       //checkpointedState = context.getOperatorStateStore().getUnionListState(descriptor);
      //我们使用上下文的isrestore()方法检查失败后是否正在恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑。
      if (context.isRestored()) {
         for (Tuple3<String, String, Long> element : checkpointedState.get()) {
            bufferedElements.add(element);
         }
      }
   }
}  
           

 initializeState方法将FunctionInitializationContext作为参数。这用于初始化non-keyed状态“containers”。这是一个ListState类型的容器,其中非键(non-keyed)状态对象将在检查点时存储。

注意状态是如何初始化的,类似于keyed state状态,使用一个StateDescriptor,其中包含状态名和关于状态所持有值的类型的信息:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
           

状态访问方法的命名约定包含其重新分布模式及其状态结构。例如,要在恢复时使用具有联合重分发(union redistribution)模式的list state,可以使用getUnionListState(descriptor)访问该状态。如果方法名不包含重分发模式,例如getListState(descriptor),它仅仅意味着将使用基本的均分重分发(even-split redistribution)模式。

在初始化容器之后,我们使用上下文的isrestore()方法来检查在失败之后是否正在恢复。如果这是true,即正在恢复,则应用恢复逻辑。

如修改后的BufferingSink代码所示,状态初始化期间恢复的数据保存在一个ListState变量中,以备将来在snapshotState()中使用。在那里,ListState将清除前一个检查点checkpoint包含的所有对象,然后被我们想要检查的新对象填满。

另外,键控状态keyed state也可以在initializeState()方法中初始化。这可以使用提供的FunctionInitializationContext来完成。

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的一个有一定限制的变体接口,它只支持列表样式list-style的状态,在恢复时使用均分重分发(even-split redistribution)模式。它还需要实现两种方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
           

恢复时处理这样的列表。如果状态不可重分区,则随时可以在snapshotState()中返回collection . singletonlist (MY_STATE)。

Stateful Source Functions

与其他操作算子相比,有状态sources需要更多的关注。为了对状态和输出集合进行原子性更新(对于故障/恢复时的exactly-once语义来说,这是必需的),用户需要从source上下文获取一个锁。

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Collections;
import java.util.List;

public class CounterSource extends RichParallelSourceFunction<Long>
   implements ListCheckpointed<Long> {

   /**  current offset for exactly once semantics */
   private Long offset;

   /** flag for job cancellation */
   private volatile boolean isRunning = true;

   @Override
   public void run(SourceFunction.SourceContext<Long> ctx) {
      final Object lock = ctx.getCheckpointLock();

      while (isRunning) {
         // output and state update are atomic
         synchronized (lock) {
            ctx.collect(offset);
            offset += 1;
         }
      }
   }

   @Override
   public void cancel() {
      isRunning = false;
   }

   @Override
   public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
      //该list只有一个元素,且能序列化
      return Collections.singletonList(offset);
   }

   @Override
   public void restoreState(List<Long> state) {
      //state.size=1,不知道官网为什么这样写
      for (Long s : state)
         offset = s;
   }
}
           

当Flink完全确认检查点checkpoint时,一些操作可能需要这些信息来与外部世界进行通信。在本例中,请参见org.apache.flink.runtime.state.CheckpointListener接口。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html

https://flink.sojb.cn/dev/stream/state/state.html

https://www.jianshu.com/p/6ed0ef5e2b74

继续阅读