天天看点

29.DataStream API之Operators(Process Function)

flink 1.9

The ProcessFunction

ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本组件:

  • Events(流元素)
  • state (容错, 一致性,只在KeyedStream上)
  • timers (事件时间和处理时间,只在KeyedStream上)

可以将ProcessFunction看做是具备访问keyed状态和定时器的FlatMapFunction。它通过invoked方法处理从输入流接收到的每个事件。

对于容错状态,ProcessFunction通过RuntimeContext访问Flink的keyed状态keyed state,类似于有状态的函数访问keyed状态。

定时器允许应用程序基于处理时间和事件时间event time响应变化。每次调用函数的 processElement(...)方法获得一个Context对象,它可以访问元素的事件时间戳,和对TimerService的访问。TimerService可以用来为将来的事件/处理时间注册回调。当定时器的达到定时时间时,会调用onTimer(...) 方法。

注意:您想访问keyed状态和定时器,则必须在键控流上应用ProcessFunction:

stream.keyBy(...).process(new MyProcessFunction())
           

Low-level Joins

为了在两个输入上实现低级别的操作,应用可以使用CoProcessFunction。该函数绑定到两个不同的输入,并为不同的输入记录分别单独调用processElement1(…)和processElement2(…)。

实现低级别join通常遵循以下模式:

  • 为一个(或两个)输入创建一个状态对象。
  • 当从输入源收到元素时,更新状态
  • 当从输入源收到元素时,探测状态并生成join的结果。

例如:你可能将客户数据连接到金融交易中,同时保存客户信息的状态,如果您关心在无序事件的情况下进行完整和确定性的连接,则当客户数据流的水印已经通过交易时,您可以使用计时器来评估和发布交易的连接。

Demo:

//数据源
package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource1 extends RichParallelSourceFunction<Tuple5<String, Integer, Long, String, String>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple5<String, Integer, Long, String, String>> ctx) throws InterruptedException {

//source2
//    Tuple3[] elements = new Tuple3[]{
//       Tuple3.of("a", "1", 1551169050000L),
//       Tuple3.of("aa", "33", 1551169064000L),
//       Tuple3.of("a", "2", 1551169054000L),
//       Tuple3.of("a", "3", 1551169064000L),
//       Tuple3.of("b", "5", 1551169100000L),
//       Tuple3.of("a", "4", 1551169079999L),
//       Tuple3.of("aa1", "44", 1551169079000L),
//       Tuple3.of("b", "6", 1551169108000L)
//    };
//source1
      Tuple5[] elements1 = new Tuple5[]{

         Tuple5.of("a", 2, 1551169050002L, "w", "e"),
         Tuple5.of("a", 1, 1551169050002L, "2", "1"),
         Tuple5.of("a", 1, 1551169050001L, "3", "1"),
         Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
         Tuple5.of("a", 2, 1551169054000L, "5", "1"),
         Tuple5.of("a", 1, 1551169050003L, "6", "1"),
         Tuple5.of("a", 3, 1551169064000L, "7", "1"),
         Tuple5.of("b", 5, 1551169100000L, "8", "1"),
         Tuple5.of("a", 4, 1551169079000L, "9", "1"),
         Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
         Tuple5.of("b", 6, 1551169108000L, "11", "1")
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple5<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2,
            (String)elements1[count].f3,(String)elements1[count].f4));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}

//主函数
package org.apache.flink.examples.java.dataStremAPI.operators.joining;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.examples.java.dataStremAPI.operators.DataSource1;
import org.apache.flink.examples.java.dataStremAPI.operators.DataSource2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class CoProcessFunctionDemo {
   public static void main(String[] args) {
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 设置水位线
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );
      // 设置水位线
      DataStream<Tuple3<String, Integer, Long>> stream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      stream.connect(stream1).keyBy(0,0).process(new CoProcessFunction<Tuple5<String, Integer, Long, String, String>, Tuple3<String, Integer, Long>, Object>() {
         @Override
         public void processElement1(Tuple5<String, Integer, Long, String, String> value, Context ctx, Collector<Object> out) throws Exception {
         }

         @Override
         public void processElement2(Tuple3<String, Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
         }
      });

   }
}
           

Example

在下面的例子中,KeyedProcessFunction维护每个键key的计数,并且在没有更新该键key的情况下,每过一分钟(在事件时间中)就会发出一个key/count对:

  • 数量,key和最后一次更新时间都存储在ValueState中,它由key隐式关联。
  • 对于每个记录,ProcessFunction将数量加一并且设置最后一次更新时间
  • function将会在一分钟后(事件时间)调度回调
  • 每当回调执行时,它检查回调的事件时间戳和存储的最后一次更新时间,如果匹配(也就是说,一分钟内没有更新)则下发key/count对

注意:这个简单的示例可以通过会话窗口实现。我们在这里使用ProcessFunction来说明它提供的基本模式。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
           

注意:在Flink1.4.0之前,当调用来自处理时间(processing-time)的定时器时,ProcessFunction.onTimer()方法设置当前处理时间(processing time)为事件时间(event-time)戳。这种行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间(processing-time)是不确定的并且不与水位对齐。此外,用户实现的逻辑依赖于这个错误的时间戳很可能是无意的错误。因此我们决定修复它。在升级到1.4.0时,使用这个错误事件时间(event-time)戳的Flink作业将会失败,用户应该将他们的工作调整为正确的逻辑。

说白了就是,对应基于事件时间(event time)模式下的应用程序,在ProcessFunction类中,当注册定时器时,不可以使用ctx.timerService().currentProcessingTime();否则程序将无法提交成功。

The KeyedProcessFunction

KeyedProcessFunction作为ProcessFunction的扩展,在其onTimer(…)方法中提供了对定时器key值的访问。 

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    K key = ctx.getCurrentKey();
    // ...
}
           

Timers

两种类型的定时器(processing-time and event-time)都由TimerService在内部维护,并排队等待执行。

TimerService对每个键和时间戳的定时器进行重复删除,即每个键和时间戳最多有一个定时器(对于相同key和时间戳的多个元素,虽然每个元素都会注册一个定时器,但是TimerService调度器会进行覆盖,因此,onTImer仅会被调用一次)。

注意,Flink本身实现了onTimer()和processElement()的同步调用。因此,用户不必担心状态的并发修改。

Fault Tolerance

定时器是容错的,并随着应用程序的状态进行检查点。在故障恢复或从保存点启动应用程序时,计时器将被恢复。

注意:当应用程序从失败中恢复或从保存点启动时,检查点checkpoint处理时间 processing-time定时器,应该在它们恢复之前被触发,且应该立即被触发。

注意:定时器总是异步进行checkpoint的,除了结合了RocksDB后端/增量快照/基于堆 heap-based的定时器(将用FLINK-10026解决)。注意,由于定时器是检查点状态的一部分,因此,大量定时器会增加checkpoint的时间,。有关如何减少定时器数量的方法,请参阅“Timer Coalescing”。

Timer Coalescing

由于Flink对每个键key和时间戳只维护一个定时器,因此可以通过降低定定时器分辨率(即将时间戳控制在秒级单位,而不是毫秒级)来合并它们,以便减少定时器的数量。

对于定时器时间单位为1秒(event or processing time)的情况,可以将目标时间的毫秒进行向下舍入到秒(举例:1553926272499---》1553926272000、1553926272956---》1553926272000)。定时器的触发时间最多比请求的时间早1秒,但不会晚于要求的毫秒精度。因此,对于秒级时间戳内(举例1553926272100、1553926272220、1553926272510、... 、1553926272980,这些时间戳的元素经过向下取舍后都是1553926272000,他们将会合并成一个定时器),在时间戳的单位秒内,每个键key最多有一个定时器。

long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
           

由于事件时间( event-time )定时器只在出现水印时触发,您还可以通过利用当前水印来调度这些计时器并将其与下一个水印相结合。

long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);
           

 定时器也可以停止和删除如下:

停止处理时间计时器:

long timestampOfTimerToStop = ...
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
           

停止事件时间计时器:

long timestampOfTimerToStop = ...
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
           

注意:如果没有注册具有给定时间戳的定时器,则停止定时器将不起作用。

https://www.jianshu.com/p/e6297fac67cb

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html

继续阅读