天天看点

15.DataStream API之Event Time(Overview)

flink 1.8

Event Time

Event Time / Processing Time / Ingestion Time

Flink在流处理程序中支持不同的时间time概念。

  • Processing time:处理时间是指执行相应操作的机器的系统时间。

当流程序在处理时间processing time上运行时,所有基于时间的操作(比如时间窗口)都将使用运行各自操作算子机器的系统时间。每小时处理时间processing time窗口将包括在系统时钟显示整个小时之间到达特定操作算子的所有记录。例如,如果应用程序在上午9:15开始运行,第一个小时处理时间processing time窗口将包括上午9:15到上午10:00之间处理的事件,下一个窗口将包括上午10:00到11:00之间处理的事件,依此类推。

处理时间processing time是时间的最简单概念,不需要流和机器之间的协调。它提供了最好的性能和最低的延迟。但是,在分布式和异步环境中,处理时间processing time具有不确定性,因为它容易受到记录到达系统的速度(例如从消息队列),记录在系统内部操作算子之间传输的速度,以及停机(调度或其他方式)的影响。

  • Event time:事件时间event time是每个事件在其生产设备上发生的时间。这个时间通常在记录进入Flink流之前包含在每条记录数据中,并且可以从每个记录中提取事件时间戳。在事件时间event time中,时间的进程取决于数据,而不是任何时钟。事件时间event time程序必须指定如何生成事件时间水印,这是表示事件时间进展的机制。这种水印机制将在下面below的一节中描述。

在一个完美的世界中,事件时间event time处理将产生完全一致和确定的结果,而不管事件何时到达或它们的顺序如何。但是,除非已知事件按顺序到达(按时间戳),否则事件时间event time处理会在等待无序事件时产生一些延迟。由于只能等待有限的一段时间,这就限制了事件时间event time应用程序的确定性。

假设所有数据都已到达,事件时间event time操作将按预期运行,即使在处理无序或延迟的事件或重新处理历史数据时,也会产生正确和一致的结果。例如,每小时事件时间event time窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们以何种顺序到达,或在什么时候处理。(有关更多信息,请参阅有关延迟事件late events的部分。)

注意,有时当事件时间event time程序实时处理实时数据时,它们会使用一些处理时间操作,以确保它们以及时的方式进行。

  • Ingestion time: 摄入时间Ingestion time是事件进入Flink的时间。在source operator中,每条记录都以时间戳的形式获取source的当前时间,基于时间的操作(如时间窗口)引用该时间戳。

摄入时间Ingestion time概念上位于事件时间event time和处理时间processing time之间。

与处理时间processing time相比,它稍微重要些,因为它提供了可预测的结果。由于摄取时间Ingestion time使用了更稳定的时间戳(在source上分配一次,即在source算子中给每条数据分配一个时间戳),记录上的不同窗口操作算子将引用相同的时间戳,而在处理时间processing time中,每个窗口操作算子可以将记录分配到不同的窗口(基于本地系统时钟和任何传输延迟进行分配),因此,对于同一个数据,在不同算子中,对应的处理时间通常也不相同。

与事件时间event time相比,摄入时间Ingestion time程序不能处理任何无序的事件或延迟的数据,但程序中不需要指定如何生成水印。

在内部,摄取时间Ingestion time与事件时间event time非常相似,但是摄取时间Ingestion time具有自动时间戳分配器和自动水印生成函数。

15.DataStream API之Event Time(Overview)
15.DataStream API之Event Time(Overview)

Setting a Time Characteristic

Flink DataStream程序的第一部分通常设置基本时间特性。该设置定义了数据源的行为方式(例如,它们是否会分配时间戳),以及KeyedStream.timeWindow(time .seconds(30))等窗口操作应该使用什么时间概念。

下面Flink程序是以小时为单位聚合事件的案例。窗口的行为与时间特性相适应。

package com.suning.brock.timeCharacteristic;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

import java.util.Properties;

public class MyEventDemo {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // alternatively:
        // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("auto.offset.reset", "largest");
        ExecutionConfig config = new ExecutionConfig();
        FlinkKafkaConsumer08<MyEvent> topic = new FlinkKafkaConsumer08<MyEvent>("topic",
                new TypeInformationSerializationSchema<MyEvent>(TypeInformation.of(MyEvent.class),config), properties);
        DataStream<MyEvent> stream = env.addSource(topic);

        stream.keyBy( (event) -> event.getUser() )
                .timeWindow(Time.hours(1))
                .reduce( (a, b) -> a.add(b) )
                .print();
    }
}
           
package com.suning.brock.timeCharacteristic;

public class MyEvent {
    private String user = "";

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public MyEvent add(MyEvent b) {
        return b;
    }
}
           

注意,为了在事件时间event time模式中运行这个例子,源sources发出的数据必须附带事件时间event time,并且需要自行生成和发送水印watermarks ,或者程序必须在源sources之后插入一个时间戳分配器Timestamp Assigner和水印生成器Watermark Generator 。这些函数描述了如何访问事件时间戳,以及事件流出现的乱序程度。

下一节描述时间戳和水印背后的执行机制。有关如何在Flink DataStream API中使用时间戳分配和水印生成的指南,请参阅生成时间戳/水印 Generating Timestamps / Watermarks。

Event Time and Watermarks

注意:Flink实现了来自数据流模型Dataflow Mode的许多技术。有关事件时间和水印的详细介绍,请参阅下面的文章。

  • Streaming 101 by Tyler Akidau
  • The Dataflow Model paper

支持事件时间event time的流处理器需要一种方法来度量事件时间的进度。例如,当事件时间event time超过一个小时时,需要通知创建小时窗口的操作算子,以便操作算子可以关闭正在运行的窗口。

事件时间Event time可以独立于处理时间processing time(由时钟测量)进行。例如,在一个程序中,操作算子当前的事件时间event time可能稍微落后于处理时间processing time(考虑到接收事件的延迟),而两者都以相同的速度进行。另一方面,另一个流程序通过快速转发处理一些已经在Kafka主题(或另一个消息队列)中缓冲数周的历史数据,可能只需要几秒钟就可以见这些历史数据全部处理完成。

在Flink中,衡量事件时间进展的机制是水印watermarks。水印携带一个时间戳t,并作为数据流的一部分。一个Watermark(t)宣称该流的事件时间已达到时间t,这意味着该流中不应该再出现有元素的时间戳t ' < = t【个人认为应该是t ' < t】的数据(即后面的事件的时间戳都应该大于或等于水印时间戳)。

下图展示的是带有(逻辑时间)时间戳和带有内部流动水印的事件流。在这个例子中,事件是按顺序排列的(基于它们的时间戳),这意味着水印只是流中的周期性标记。

15.DataStream API之Event Time(Overview)

水印watermark对于无序流out-of-order非常重要,如下图所示,在无序流中,事件不是按照它们的时间戳排序的。一般来说,水印是一种声明,通过流中的该点,直到指定时间戳为止的所有事件都已经到达该点。水印到达操作算子后,操作算子可以使用内部事件时间event time时钟来更新水印watermark的值。

15.DataStream API之Event Time(Overview)

注意,事件时间event time由新创建的流元素(或元素)继承,这些流元素或来自生成它们的事件,或来自触发这些元素创建的水印。

Watermarks in Parallel Streams并行流水印

水印在源函数source functions处生成,或者直接在源函数source functions之后生成。源函数source functions的每个并行子任务通常独立地生成各自的水印。这些水印定义了该特定并行source的事件时间。

当水印流经流程序时,它们会更新所到之处的操作算子的事件时间。无论操作算子什么时候更新它的事件时间,它会为它的后续操作算子生成一个新的水印。

一些操作算子使用多个输入流;例如,在keyBy(…)或partition(…)函数之后的union或运算符。此类操作符的当前事件时间是其输入流的事件时间的最小值。当它的输入流更新它们的事件时间时,操作符也更新它们。

一些操作算子使用多个输入流。union操作算子,例如:或者keyBy(…) 或 partition(…)函数之后的操作算子。这样的操作算子的当前事件时间是它的输入流的事件时间的最小值。当输入流更新它们的事件时间,操作算子也会更新。

下图显示了一个通过并行流流动的事件和水印,以及操作算子跟踪事件时间的示例。

15.DataStream API之Event Time(Overview)

注意Kafka source支持每个分区都维护着各自的水印,您可以在这里here阅读更多信息。

Late Elements

某些元素可能会违反水印条件,这意味着即使Watermark(t)已经发生,也会出现更多具有时间戳 t ' <= t的元素。事实上,在实际应用中,某些元素可能被任意延迟,因此不可能指定某个事件时间戳的所有元素发生的时间。因此,延迟是有限制的,将水印延迟太多通常也是不可取的,因为这会导致事件时间窗的计算延迟太多。

基于这个原因,流程序可能会明确地期望一些延迟的元素。延迟元素是在系统的事件时间时钟(由水印watermarks发出信号)已经通过了延迟元素的时间戳之后到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参见“Lateness”。

Idling sources

目前,对于纯事件时间水印生成器,如果没有要处理的元素,水印的时间戳将无法继续增长前进。这意味着,在输入数据出现间隙的情况下,事件时间将不会继续前进,例如窗口操作算子将不会被触发,因此现有窗口将无法输出任何数据。

为了避免这种情况,可以使用周期水印分配器periodic watermark assigners (实现该接口AssignerWithPeriodicWatermarks),它不仅基于元素时间戳进行赋值。该问题的解决方案是通过使用一个分配器assigner ,该分配器assigner在一段时间没有观察到新事件产生后,切换到使用当前处理时间作为时间基础的分配器。

源Sources可以标记为在空闲时使用SourceFunction.SourceContext#markAsTemporarilyIdle。有关详细信息,请参阅此方法的Javadoc以及StreamStatus。

Debugging Watermarks

有关运行时调试水印,请参阅调试窗口和事件时间 Debugging Windows & Event Time部分。

How operators are processing watermarks操作算子如何处理水印

一般来说,操作算子operators需要在水印发送到下游之前对其进行处理。例如,WindowOperator将首先评估应该触发哪个窗口,并且只有在生成由水印触发的所有输出之后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都会在水印之前发出。

/**
 * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
 * {@link Trigger}.
 *
 * <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
 * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
 * is put into panes. A pane is the bucket of elements that have the same key and same
 * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
 * {@code WindowAssigner}.
 *
 * <p>Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
 * the contents of the pane should be processed to emit results. When a trigger fires,
 * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
 * the pane to which the {@code Trigger} belongs.
 *
 * @param <K> The type of key returned by the {@code KeySelector}.
 * @param <IN> The type of the incoming elements.
 * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
 * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
 */
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
   extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
   implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {...}
           

同样的规则也适用于TwoInputStreamOperator。然而,在这种情况下,操作算子的当前水印被定义为其两个输入的最小值,(取两个流中的最小水印时间戳---》Min(w(t1),w(t2)))。

/**
 * Interface for stream operators with two inputs. Use
 * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
 * you want to implement a custom operator.
 *
 * @param <IN1> The input type of the operator
 * @param <IN2> The input type of the operator
 * @param <OUT> The output type of the operator
 */
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {...}
           

此行为的详细信息由OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2方法的实现定义。

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

https://flink.sojb.cn/dev/event_time.html

https://www.jianshu.com/p/6727dfe0aaee 

继续阅读