天天看點

19.DataStream API之State & Fault Tolerance(Working with State)

flink 1.9

State & Fault Tolerance

本文檔解釋了如何在開發應用程式時使用Flink的狀态抽象。

Keyed State and Operator State

Flink中有兩種基本狀态:key值狀态Keyed State和操作算子狀态Operator State。

Keyed State

Keyed State總是與key相關,并且隻能應用于KeyedStream的函數和操作算子中。

您可以将Keyed State視為已分區或分片的操作算子狀态,每個key值對應一個狀态分區。每個keyed-state在邏輯與<parallel-operator-instance, key>綁定在一起,由于每個key隻“屬于”keyed operator操作算子的一個并行執行個體,是以我們可以簡單地将其看作<operator, key>。

Keyed State可以進一步的組成Key Group, Key Group是Flink重新配置設定Keyed State的最小單元,這裡有跟定義的最大并行數一樣多的Key Group,在運作時keyed operator的并行執行個體與key一起為一個或者多個Key Group工作。

Operator State

使用Operator State狀态(或non-keyed state狀态),每個操作算子狀态Operator State都綁定到一個并行操作算子執行個體。Kafka Connector就是在Flink中使用Operator State的一個很好的例子。每個Kafka consumer的并行執行個體儲存着一個topic分區和offsets偏移量的map(topic-partition,offsets)作為它的Operator State。

當并行度發生變化時,Operator State接口支援在并行操作執行個體中進行重新配置設定狀态,這裡有多種方法來進行重配置設定。

總結:flink的狀态分為兩種:keyed state和operator state

(1)Keyed state:每個鍵key狀态在邏輯上是唯一組合的,并以<parallel-operator-instance, key>的形式進行綁定,是以狀态存儲形式是:<operator, key>---》<操作算子線程執行個體,key值>

(2)Operator state:每個操作算子狀态都綁定到一個并行運算符執行個體上,以kafka connector為例,狀态存儲形式:<topic partition,offset>

Raw and Managed State

Keyed State和 Operator State存在兩種形式:托管managed 和原生raw。

托管狀态Managed State由Flink運作時控制的資料結構表示,如内部哈希表或RocksDB。例如“ValueState”、“ListState”等。Flink運作時對狀态進行編碼并将它們寫入檢查點checkpoints。(狀态存儲的資料結構由flink程式控制)

原始狀态Raw State是操作算子儲存它們自己的資料結構中的狀态。當進行checkpoints時,它們僅僅寫一串byte數組到checkpoint中。Flink并不知道State的資料結構,僅能看到原生的byte數組。(狀态存儲的資料結構由使用者控制)

所有datastream函數都可以使用托管狀态managed state,但是原始狀态raw state接口隻能在實作操作算子時使用。建議使用托管狀态managed state(而不是原始狀态raw state),因為使用托管狀态(managed state),Flink可以在并行度發生更改時自動重新配置設定狀态,而且還可以進行更好的記憶體管理。

Attention: 如果您的托管狀态managed state需要自定義序列化邏輯,請參閱相應的指南,以確定将來的相容性。Flink的預設序列化器不需要特殊處理。

Using Managed Keyed State

托管的鍵控state(managed keyed state)接口可以通路所有目前輸入元素的key範圍内的不同類型的state,這也就意味着這種類型的state隻能被通過stream.keyBy(...)建立的KeyedStream使用。

現在我們首先來看一下可用的不同類型的state,然後在看它們是如何在程式中使用的,可用State的原語如下:

  • ValueState<T>:這裡儲存了一個可以更新和檢索的值(由上述輸入元素的key所限定,是以一個操作的每個key可能有一個值)。這個值可以使用update(T)來更新,使用T value()來擷取。
  • ListState<T>:這個儲存了一個元素清單,你可以追加元素以及擷取一個儲存目前所有元素的Iterable,可以通過調用add(T)或addAll(List<T>)來添加元素,通過調用Iterable<T> get()來擷取元素,還可以通過調用update(List<T>)來更新元素。
  • ReducingState<T>: 這将保留一個值,該值表示添加到狀态的所有值的聚合。該接口類似于ListState,這種狀态通過使用者傳入的reduceFunction,每次調用add(T)方法添加值的時候,會調用reduceFunction,最後合并到一個單一的狀态值。
  • AggregatingState<IN, OUT>:這将保留一個值,該值表示添加到狀态的所有值的聚合。與ReducingState相反,聚合類型可能與添加到狀态中的元素類型不同(例如:輸入的是String類型,聚合輸出是Integer類型)。接口與ListState相同,但是使用add(IN)添加元素,使用AggregateFunction将這些添加的元素聚合成一個值。
19.DataStream API之State &amp; Fault Tolerance(Working with State)
  • FoldingState<T, ACC>: 他保留一個值,表示添加到狀态的所有值的聚合。與ReducingState相反,聚合類型可能與添加到狀态中的元素類型不同。該接口類似于ListState,但是使用add(T)添加元素,使用FoldFunction将這些添加的元素聚合成一個值。(注意:這種狀态将會在Flink未來版本(flink 1.4)中被删除,未來會使用AggregatingState<IN, OUT>替代)
  • MapState<UK, UV>: 這個儲存了一個映射mappings清單,你可以添加key-value對到state中并且檢索一個包含所有目前儲存的映射的Iterable。映射mappings可以使用put(UK, UV)或者putAll(Map<UK, UV>)來添加。與key相關的value,可以使用get(UK)來擷取,映射mappings的疊代iterable 、keys以及values可以分别調用entries(), keys()和values()來擷取。

所有類型的state都有一個clear()方法來清除目前活動的key(及輸入元素的key)的State。

注意FoldingState和FoldingStateDescriptor在Flink 1.4中已經被棄用,并在将來的版本中徹底删除。請使用AggregatingState和AggregatingStateDescriptor。

19.DataStream API之State &amp; Fault Tolerance(Working with State)

值得注意的是這些State對象僅用于與State接口,State并不隻儲存在記憶體中,也可能會在磁盤中或者其他地方,第二個需要注意的是從State中擷取的值依賴于輸入元素的key,是以如果涉及的key不同,那麼再一次調用使用者函數中獲得的值可能與另一次調用的值不同。

為了獲得一個State句柄,你需要建立一個StateDescriptor,這個StateDescriptor儲存了state的名稱(接下來我們會講到,你可以建立若幹個state,但是它們必須有唯一的名稱以便你能夠引用它們),State儲存的值的類型,可能還有一個指定的函數,例如:ReduceFunction。根據你想要檢索的state的類型,你可以建立ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor或MapStateDescriptor。

State可以通過RuntimeContext來通路,是以隻能在rich functions中使用。有關這方面的資訊,請參閱這裡here,接下來我們将也會看到一個示例。在RichFunction中的RuntimeContext有以下可以通路狀态state的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

這個FlatMapFunction例子展示了所有部件如何組合在一起:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
           

 這個例子實作了一個簡單的計數器,我們使用元組的第一個字段來進行分組(這個例子中,所有的key值都是1),這個函數将計數和運作時的總和儲存在一個ValueState中,一旦計數大于2,它就會發出平均值并清除狀态state,這樣我們就又可以從0開始。請注意,如果我們在第一個字段中具有不同值的元組,那麼這将為每個不同輸入的key值儲存不同的狀态state值。

State Time-To-Live (TTL) ---> flink 1.6才引入的功能

生存時間(TTL)可以配置設定給任何類型的keyed state。如果配置了TTL,并且狀态值已經過期,那麼将盡最大努力清理存儲值,下面将對此進行更詳細的讨論。

每個狀态都有TTL, 所有狀态集合類型都支援給每個元素配置TTLs。這意味着list元素和map記錄可以獨立設定過期時間TTL。

為了使用狀态TTL,必須首先建構一個StateTtlConfig配置對象。然後,可以通過傳遞配置在任何狀态描述符中啟用TTL功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
           

配置有幾個選項需要考慮:

newBuilder的第一個參數是必選的,它表示狀态的剩餘生存時間TTL。

當狀态的TTL重新整理時,配置更新類型(預設是:OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 僅有建立和寫入是通路
  • StateTtlConfig.UpdateType.OnReadAndWrite - 僅有讀取時通路

狀态可見性配置如果未清除過期值,則在讀取通路時是否傳回過期值(預設情況下,NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 永遠不會傳回過期的值(備注:TTL結束,即過期了的狀态state就會被清除)
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用則傳回(備注:TTL結束,即過期了的狀态state不會立刻被清除,隻有在調用ValueState.clear()後才會被清除)

在NeverReturnExpired模式下,過期狀态表現的就好像它不再存在一樣,即使它仍然必須被删除。該選項對于在TTL之後,必須嚴格限制資料再次被讀取通路的場景非常有用,例如:對應處理隐私敏感資料的應用程式。

另一個選項ReturnExpiredIfNotCleanedUp允許在清理之前傳回過期狀态。

Notes:

  • 狀态後端state backends存儲了最後一次修改的時間戳和使用者值,這意味着啟用此功能會增加狀态存儲的消耗。對于堆狀态後端 (Heap state backend)存儲一個額外的Java對象,其中包含對使用者狀态對象的引用和記憶體中的原始long值。RocksDB狀态後端為每個存儲值value,清單list項或映射map項添加8個位元組。
  • 目前在processing event模式下才支援狀态的 TTL 。
  • 以前在沒有狀态TTL配置的模式下,如果試圖恢複狀态,通常使用TTL的描述符來實作,如果不使用描述符恢複狀态,則将導緻相容性失敗和statmigrationexception異常。。
  • TTL配置不是檢查點checkpoint或儲存點savepoints的一部分,而是Flink在目前運作的作業job中如何處理它的一種方式。
  • 隻有當使用者值序列化器能夠處理空值時,目前具有TTL模式的map狀态才支援空使用者值。如果序列化器不支援null值,可以使用NullableSerializer對其進行包裝,代價是在序列化形式中增加一個位元組。

Cleanup of Expired State

預設情況下,過期值隻有在顯式讀出時才會被删除,例如通過調用ValueState.value()。

注意:這意味着預設情況下,如果未讀取過期狀态,則不會删除它,這可能導緻狀态不斷增大。這可能會在将來的版本中發生變化。

Cleanup in full snapshot清除整個快照

此外,您可以在擷取完整狀态快照時激活清理操作,這将減少狀态的大小。目前實作不會清理本地狀态,但從上一個快照snapshot恢複時,它不會包含已删除的過期狀态。可以在StateTtlConfig中配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();
           

 此選項不适用于RocksDB狀态後端中的增量檢查點。

以後還會添加更多的政策在背景自動清理過期狀态。

作用:

可以規定狀态state(ValueState<T>、ListState<T>、MapState<UK, UV>...)的生存時間,防止state一直被checkpoint,導緻checkpoint越來越大,最終程式崩潰。

樣例demo:

state的TTL時間:3s

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSource  extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws InterruptedException {

      Tuple3[] elements = new Tuple3[]{
         Tuple3.of("a", "1", 1551169050000L),
         Tuple3.of("aa", "33", 1551169064000L),
         Tuple3.of("a", "2", 1551169054000L),
         Tuple3.of("a", "3", 1551169064000L),
         Tuple3.of("b", "5", 1551169100000L),
         Tuple3.of("a", "4", 1551169079000L),
         Tuple3.of("aa", "44", 1551169079000L),
         Tuple3.of("b", "6", 1551169108000L)
      };

      int count = 0;
      while (running && count < elements.length) {
         ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (Long) elements[count].f2));
         count++;
         Thread.sleep(1000);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           
package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class StateMangerTTL {
   public static void main(String[] args) throws Exception {

      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 設定資料源
      DataStream<Tuple3<String, String, Long>> source = env.addSource(new StreamDataSource()).setParallelism(1).name("Demo Source");

      // 設定水位線
      DataStream<Tuple3<String, String, Long>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      stream.keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple3<String, String, Long>, Object>() {
         Long lastTime = 0L;
         ValueState<String> valueState;

         @Override
         public void open(Configuration parameters) throws Exception {
            System.out.println("open-------------:");
//狀态的TTL時間:3s
            StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.seconds(3))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               //這個模式,TTL結束狀态就會被清除,如果是ReturnExpiredIfNotCleanedUp
               //隻有顯示調用ValueState.value()後才會被清除
               .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

            ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
            stateDescriptor.enableTimeToLive(ttlConfig);
            valueState = getRuntimeContext().getState(stateDescriptor);
         }

         @Override
         public void processElement(Tuple3<String, String, Long> value, Context context, Collector<Object> collector) throws Exception {
            System.out.println("processElement---------:" + value + " context.timestamp: " + context.timestamp() + " class: " + this.hashCode());
            valueState.update(value.f1);
            Thread.sleep(2000);
            System.out.println("------" + value.f1 + "--------: " + valueState.value());
            Thread.sleep(2000);
            System.out.println("------" + value.f1 + "--------: " + valueState.value());
            Long lastTimestamp = context.timestamp();
            //設定定時器30分鐘,30分鐘後未更新,則回調onTimer
            lastTime = lastTimestamp;
            context.timerService().registerEventTimeTimer(lastTimestamp + 20000);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            System.out.println("onTimer-------:" + timestamp + " OnTimerContext--key: " + ctx.getCurrentKey() + "---" + ctx.timeDomain());
            if (lastTime + 20000 == timestamp) {
               System.out.println("---onTimer--:" + timestamp + " ctx.key " + ctx.getCurrentKey());
            }
         }

         @Override
         public void close() throws Exception {
            System.out.println("close--------:");
         }
      }).print();
      env.execute("TimeWindowDemo");
   }
}
           
open-------------:
open-------------:
processElement---------:(a,1,1551169050000) context.timestamp: 1551169050000 class: 67103355
------1--------: 1
processElement---------:(b,5,1551169100000) context.timestamp: 1551169100000 class: 133908999
------1--------: null
processElement---------:(aa,33,1551169064000) context.timestamp: 1551169064000 class: 67103355
------5--------: 5
------33--------: 33
------5--------: null
processElement---------:(b,6,1551169108000) context.timestamp: 1551169108000 class: 133908999
------33--------: null
processElement---------:(a,2,1551169054000) context.timestamp: 1551169054000 class: 67103355
------6--------: 6
------2--------: 2
------6--------: null
onTimer-------:1551169120000 OnTimerContext--key: (b)---EVENT_TIME
onTimer-------:1551169128000 OnTimerContext--key: (b)---EVENT_TIME
---onTimer--:1551169128000 ctx.key (b)
close--------:
------2--------: null
processElement---------:(a,3,1551169064000) context.timestamp: 1551169064000 class: 67103355
------3--------: 3
------3--------: null
onTimer-------:1551169070000 OnTimerContext--key: (a)---EVENT_TIME
onTimer-------:1551169074000 OnTimerContext--key: (a)---EVENT_TIME
onTimer-------:1551169084000 OnTimerContext--key: (a)---EVENT_TIME
---onTimer--:1551169084000 ctx.key (a)
onTimer-------:1551169084000 OnTimerContext--key: (aa)---EVENT_TIME
---onTimer--:1551169084000 ctx.key (aa)
processElement---------:(a,4,1551169079000) context.timestamp: 1551169079000 class: 67103355
------4--------: 4
------4--------: null
processElement---------:(aa,44,1551169079000) context.timestamp: 1551169079000 class: 67103355
------44--------: 44
------44--------: null
onTimer-------:1551169099000 OnTimerContext--key: (a)---EVENT_TIME
---onTimer--:1551169099000 ctx.key (a)
onTimer-------:1551169099000 OnTimerContext--key: (aa)---EVENT_TIME
---onTimer--:1551169099000 ctx.key (aa)
close--------:
           

Cleanup in background

除了對整個快照進行清理外,還可以在背景激活清理操作。下面的選項将激活StateTtlConfig中的預設背景清理快照狀态,如果背景支援該選項:

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInBackground()
    .build();
           

 要對背景的一些特殊清理進行更細粒度的控制,可以按照下面的描述分别配置它。目前,堆狀态後端(heap state backend)主要是增量清理,而RocksDB後端使用壓縮過濾器在背景進行清理。

Incremental cleanup

另一個選項是增量地觸發對某些狀态項的清理。觸發器可以是來自每個狀态通路或/和每個記錄處理的回調。如果這個清理政策對于某個狀态是激活的,那麼存儲後端會在其所有條目上為該狀态保留一個惰性全局疊代器。每次觸發增量清理時,疊代器都會被提升。檢查周遊的狀态項,并清理過期的狀态項。

這個功能可以在StateTtlConfig中激活:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
           

這種政策有兩個參數。第一個是每次清理觸發的檢查狀态條目數。如果啟用,則每次狀态通路都将觸發它。第二個參數定義是否為每個記錄處理額外觸發清理。如果啟用預設的背景清理,則此政策将在堆後端(heap backend)激活,其中包含5個選中的條目,且每個記錄處理不需要清理。

Notes:

  • 如果沒有對狀态進行通路或沒有處理任何記錄,則過期狀态将持久存在。
  • 用于增量清理的時間增加了記錄處理的延遲。
  • 目前,隻有堆狀态後端(heap state backend)實作了狀态的增量清理。為RocksDB設定狀态的增量清理是不起作用的。
  • 如果使用堆狀态後端(heap state backend)進行同步快照(synchronous snapshotting),全局疊代器在疊代時保留所有keys的副本,因為它的特定實作不支援并發修改。啟用此功能将增加記憶體消耗。異步快照沒有這個問題。
  • 對于現有作業job,此清理政策(增量清理)可以在StateTtlConfig中随時激活或停用,例如從儲存點重新啟動後。

Cleanup during RocksDB compaction在RocksDB壓縮期間進行清理

如果使用RocksDB狀态後端,另一個清理政策是激活Flink特定的壓縮過濾器。RocksDB定期運作異步壓縮來合并狀态更新和減少存儲。Flink壓縮過濾器使用TTL檢查狀态項的過期時間戳,并排除過期值。

預設情況下禁用此功能。必須首先在RocksDB後端激活它,通過設定Flink配置選項state.backend. RocksDB .ttl.compaction.filter.enabled,或者在為作業job建立自定義RocksDB狀态後端時,調用RocksDBStateBackend::enableTtlCompactionFilter。然後任何帶有TTL的狀态都可以為其配置和使用過濾器:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();
           

RocksDB壓縮過濾器将在每次處理一定數量的狀态項之後,從Flink查詢用于檢查過期的目前時間戳。您可以更改它,并将自定義值傳遞給StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。更頻繁地更新時間戳可以提高清理速度,但會降低壓縮性能,因為它使用來自本地代碼的JNI調用。如果啟用預設的背景清理,那麼RocksDB後端将激活此政策,每次處理1000個條目時将查詢目前時間戳。

通過激活FlinkCompactionFilter的調試級别,您可以從RocksDB過濾器的本地代碼中激活調試日志:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
           

Notes:

  • 在壓縮過程中調用TTL過濾器會減慢它的速度。TTL過濾器必須解析上次通路的時間戳,并檢查每個正在壓縮的key的每個存儲狀态條目的過期時間。對于集合狀态類型(list or map),每個存儲的元素也調用TTL過濾器進行檢查。
  • 如果該特性與包含非固定位元組長度元素的清單狀态一起使用,原生TTL過濾器還必須為每個狀态條目額外調用一個Flink java類型序列化器,該序列化器是元素在JNI上的序列化器,其中隻有第一個元素已過期,才能确定下一個未過期元素的偏移量。
  • 對于現有作業job,此清理政策可以在StateTtlConfig中随時激活或停用,例如從儲存點savepoint重新啟動。

State in the Scala DataStream API

除了上面描述的接口之外,Scala API還為KeyedStream上具有單個ValueState的有狀态的map()或flatMap()函數提供了快捷方式。user函數在一個選項中擷取ValueState的目前值,并且必須傳回一個更新後的值,該值将用于更新狀态。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })
           

Using Managed Operator State

要使用托管操作符算子狀态,有狀态函數可以實作更通用的CheckpointedFunction接口,也可以實作listcheckpoint <T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction接口通過不同的重新配置設定方案提供對non-keyed 狀态的通路。它需要實作兩個方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
           

無論何時必須執行檢查點,都會調用snapshotState()。每當初始化使用者定義的函數時,都會調用對應的initializeState(),無論是在函數第一次初始化時,還是在函數從早期檢查點實際恢複時。是以,initializeState()不僅是初始化不同類型狀态的地方,而且是包含狀态恢複邏輯的地方。

目前,支援list樣式的托管操作算子狀态。狀态應該是一個可序列化對象的List,彼此獨立,是以在重新調整時可以重新配置設定。換句話說,這些對象是可以重新分布non-keyed狀态的最細粒度。根據狀态通路方法,定義了以下重分發方案:

  • Even-split redistribution:每個操作算子都會傳回一個狀态state元素清單list。整個狀态在邏輯上是所有清單lists的串聯。在恢複/重新分發時,該清單list被平均地分成盡可能多的并行操作算子子清單sublists 。每個操作算子擷取一個子清單,該子清單可以是空的,也可以包含一個或多個元素。例如,list<list1, list2, ... ,listn> ---> list<list1[e1,e2], list2[],list3[e1,e2,e3], list4[e1]>,如果并行度為1(parallelism =1),則操作算子的檢查點checkpointed 狀态包含元素list1,list2,list3,list4,當将并行度增加到2時,list1,list2可能會出現在操作算子執行個體0中,而list3,list4則會出現在操作算子執行個體1中。
  • Union redistribution:每個操作算子都會傳回一個狀态state元素清單list。整個狀态在邏輯上是所有清單的串聯。在恢複/重新分發時,每個操作算子都會獲得狀态元素的完整清單。例如,list<list1, list2, ... ,listn> ---> list<list1[e1,e2], list2[],list3[e1,e2,e3], list4[e1]>,如果并行度為1,則操作算子的檢查點狀态包含元素list1,list2,list3,list4,當将并行度增加到2時,list1,list2,list3,list4會出現在操作算子執行個體0中,而list1,list2,list3,list4也會出現在操作算子執行個體1中。

下面是一個有狀态SinkFunction的例子,它使用CheckpointedFunction在将元素發送到外部之前緩沖它們。它示範了even-split redistribution模式的清單狀态:

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class BufferingSink implements SinkFunction<Tuple3<String, String, Long>>,
   CheckpointedFunction {

   private final int threshold;

   private transient ListState<Tuple3<String, String, Long>> checkpointedState;

   private List<Tuple3<String, String, Long>> bufferedElements;

   public BufferingSink(int threshold) {
      this.threshold = threshold;
      this.bufferedElements = new ArrayList<>();
   }

   @Override
   public void invoke(Tuple3<String, String, Long> value) throws Exception {
      bufferedElements.add(value);
      if (bufferedElements.size() == threshold) {
         for (Tuple3<String, String, Long> element: bufferedElements) {
            // send it to the sink
         }
         bufferedElements.clear();
      }
   }

   /**
    * 每次進行checkpoint前,該方法會被調用,是以需要将list緩存的資料儲存到listState狀态中
    * @param context the context for drawing a snapshot of the operator
    * @throws Exception
    */
   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
      checkpointedState.clear();//目的是清除前一個checkpoint的狀态資料,儲存新的下一個checkpoint的資料
      for (Tuple3<String, String, Long> element : bufferedElements) {
         checkpointedState.add(element);
      }
   }

   /**
    * 程式重新開機時或者第一次初始化使用者對象方法時,該方法被調用,主要用于程式重新開機時,從checkpoint狀态中恢複資料。
    * @param context the context for initializing the operator
    * @throws Exception
    */
   @Override
   public void initializeState(FunctionInitializationContext context) throws Exception {
      ListStateDescriptor<Tuple3<String, String, Long>> descriptor =
         new ListStateDescriptor<>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple3<String, String, Long>>() {}));
      //狀态分發政策:Even-split redistribution
      checkpointedState = context.getOperatorStateStore().getListState(descriptor);
       //狀态分發政策:Union redistribution
       //checkpointedState = context.getOperatorStateStore().getUnionListState(descriptor);
      //我們使用上下文的isrestore()方法檢查失敗後是否正在恢複。如果這是真的,即我們正在恢複,則應用恢複邏輯。
      if (context.isRestored()) {
         for (Tuple3<String, String, Long> element : checkpointedState.get()) {
            bufferedElements.add(element);
         }
      }
   }
}  
           

 initializeState方法将FunctionInitializationContext作為參數。這用于初始化non-keyed狀态“containers”。這是一個ListState類型的容器,其中非鍵(non-keyed)狀态對象将在檢查點時存儲。

注意狀态是如何初始化的,類似于keyed state狀态,使用一個StateDescriptor,其中包含狀态名和關于狀态所持有值的類型的資訊:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
           

狀态通路方法的命名約定包含其重新分布模式及其狀态結構。例如,要在恢複時使用具有聯合重分發(union redistribution)模式的list state,可以使用getUnionListState(descriptor)通路該狀态。如果方法名不包含重分發模式,例如getListState(descriptor),它僅僅意味着将使用基本的均分重分發(even-split redistribution)模式。

在初始化容器之後,我們使用上下文的isrestore()方法來檢查在失敗之後是否正在恢複。如果這是true,即正在恢複,則應用恢複邏輯。

如修改後的BufferingSink代碼所示,狀态初始化期間恢複的資料儲存在一個ListState變量中,以備将來在snapshotState()中使用。在那裡,ListState将清除前一個檢查點checkpoint包含的所有對象,然後被我們想要檢查的新對象填滿。

另外,鍵控狀态keyed state也可以在initializeState()方法中初始化。這可以使用提供的FunctionInitializationContext來完成。

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的一個有一定限制的變體接口,它隻支援清單樣式list-style的狀态,在恢複時使用均分重分發(even-split redistribution)模式。它還需要實作兩種方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
           

恢複時處理這樣的清單。如果狀态不可重分區,則随時可以在snapshotState()中傳回collection . singletonlist (MY_STATE)。

Stateful Source Functions

與其他操作算子相比,有狀态sources需要更多的關注。為了對狀态和輸出集合進行原子性更新(對于故障/恢複時的exactly-once語義來說,這是必需的),使用者需要從source上下文擷取一個鎖。

package org.apache.flink.examples.java.dataStremAPI.stateFaultTolerance;

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Collections;
import java.util.List;

public class CounterSource extends RichParallelSourceFunction<Long>
   implements ListCheckpointed<Long> {

   /**  current offset for exactly once semantics */
   private Long offset;

   /** flag for job cancellation */
   private volatile boolean isRunning = true;

   @Override
   public void run(SourceFunction.SourceContext<Long> ctx) {
      final Object lock = ctx.getCheckpointLock();

      while (isRunning) {
         // output and state update are atomic
         synchronized (lock) {
            ctx.collect(offset);
            offset += 1;
         }
      }
   }

   @Override
   public void cancel() {
      isRunning = false;
   }

   @Override
   public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
      //該list隻有一個元素,且能序列化
      return Collections.singletonList(offset);
   }

   @Override
   public void restoreState(List<Long> state) {
      //state.size=1,不知道官網為什麼這樣寫
      for (Long s : state)
         offset = s;
   }
}
           

當Flink完全确認檢查點checkpoint時,一些操作可能需要這些資訊來與外部世界進行通信。在本例中,請參見org.apache.flink.runtime.state.CheckpointListener接口。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html

https://flink.sojb.cn/dev/stream/state/state.html

https://www.jianshu.com/p/6ed0ef5e2b74

繼續閱讀