天天看點

15.DataStream API之Event Time(Overview)

flink 1.8

Event Time

Event Time / Processing Time / Ingestion Time

Flink在流處理程式中支援不同的時間time概念。

  • Processing time:處理時間是指執行相應操作的機器的系統時間。

當流程式在處理時間processing time上運作時,所有基于時間的操作(比如時間視窗)都将使用運作各自操作算子機器的系統時間。每小時處理時間processing time視窗将包括在系統時鐘顯示整個小時之間到達特定操作算子的所有記錄。例如,如果應用程式在上午9:15開始運作,第一個小時處理時間processing time視窗将包括上午9:15到上午10:00之間處理的事件,下一個視窗将包括上午10:00到11:00之間處理的事件,依此類推。

處理時間processing time是時間的最簡單概念,不需要流和機器之間的協調。它提供了最好的性能和最低的延遲。但是,在分布式和異步環境中,處理時間processing time具有不确定性,因為它容易受到記錄到達系統的速度(例如從消息隊列),記錄在系統内部操作算子之間傳輸的速度,以及停機(排程或其他方式)的影響。

  • Event time:事件時間event time是每個事件在其生産裝置上發生的時間。這個時間通常在記錄進入Flink流之前包含在每條記錄資料中,并且可以從每個記錄中提取事件時間戳。在事件時間event time中,時間的程序取決于資料,而不是任何時鐘。事件時間event time程式必須指定如何生成事件時間水印,這是表示事件時間進展的機制。這種水印機制将在下面below的一節中描述。

在一個完美的世界中,事件時間event time處理将産生完全一緻和确定的結果,而不管事件何時到達或它們的順序如何。但是,除非已知事件按順序到達(按時間戳),否則事件時間event time處理會在等待無序事件時産生一些延遲。由于隻能等待有限的一段時間,這就限制了事件時間event time應用程式的确定性。

假設所有資料都已到達,事件時間event time操作将按預期運作,即使在處理無序或延遲的事件或重新處理曆史資料時,也會産生正确和一緻的結果。例如,每小時事件時間event time視窗将包含帶有落入該小時的事件時間戳的所有記錄,無論它們以何種順序到達,或在什麼時候處理。(有關更多資訊,請參閱有關延遲事件late events的部分。)

注意,有時當事件時間event time程式實時處理實時資料時,它們會使用一些處理時間操作,以確定它們以及時的方式進行。

  • Ingestion time: 攝入時間Ingestion time是事件進入Flink的時間。在source operator中,每條記錄都以時間戳的形式擷取source的目前時間,基于時間的操作(如時間視窗)引用該時間戳。

攝入時間Ingestion time概念上位于事件時間event time和處理時間processing time之間。

與處理時間processing time相比,它稍微重要些,因為它提供了可預測的結果。由于攝取時間Ingestion time使用了更穩定的時間戳(在source上配置設定一次,即在source算子中給每條資料配置設定一個時間戳),記錄上的不同視窗操作算子将引用相同的時間戳,而在處理時間processing time中,每個視窗操作算子可以将記錄配置設定到不同的視窗(基于本地系統時鐘和任何傳輸延遲進行配置設定),是以,對于同一個資料,在不同算子中,對應的處理時間通常也不相同。

與事件時間event time相比,攝入時間Ingestion time程式不能處理任何無序的事件或延遲的資料,但程式中不需要指定如何生成水印。

在内部,攝取時間Ingestion time與事件時間event time非常相似,但是攝取時間Ingestion time具有自動時間戳配置設定器和自動水印生成函數。

15.DataStream API之Event Time(Overview)
15.DataStream API之Event Time(Overview)

Setting a Time Characteristic

Flink DataStream程式的第一部分通常設定基本時間特性。該設定定義了資料源的行為方式(例如,它們是否會配置設定時間戳),以及KeyedStream.timeWindow(time .seconds(30))等視窗操作應該使用什麼時間概念。

下面Flink程式是以小時為機關聚合事件的案例。視窗的行為與時間特性相适應。

package com.suning.brock.timeCharacteristic;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

import java.util.Properties;

public class MyEventDemo {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // alternatively:
        // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("auto.offset.reset", "largest");
        ExecutionConfig config = new ExecutionConfig();
        FlinkKafkaConsumer08<MyEvent> topic = new FlinkKafkaConsumer08<MyEvent>("topic",
                new TypeInformationSerializationSchema<MyEvent>(TypeInformation.of(MyEvent.class),config), properties);
        DataStream<MyEvent> stream = env.addSource(topic);

        stream.keyBy( (event) -> event.getUser() )
                .timeWindow(Time.hours(1))
                .reduce( (a, b) -> a.add(b) )
                .print();
    }
}
           
package com.suning.brock.timeCharacteristic;

public class MyEvent {
    private String user = "";

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public MyEvent add(MyEvent b) {
        return b;
    }
}
           

注意,為了在事件時間event time模式中運作這個例子,源sources發出的資料必須附帶事件時間event time,并且需要自行生成和發送水印watermarks ,或者程式必須在源sources之後插入一個時間戳配置設定器Timestamp Assigner和水印生成器Watermark Generator 。這些函數描述了如何通路事件時間戳,以及事件流出現的亂序程度。

下一節描述時間戳和水印背後的執行機制。有關如何在Flink DataStream API中使用時間戳配置設定和水印生成的指南,請參閱生成時間戳/水印 Generating Timestamps / Watermarks。

Event Time and Watermarks

注意:Flink實作了來自資料流模型Dataflow Mode的許多技術。有關事件時間和水印的詳細介紹,請參閱下面的文章。

  • Streaming 101 by Tyler Akidau
  • The Dataflow Model paper

支援事件時間event time的流處理器需要一種方法來度量事件時間的進度。例如,當事件時間event time超過一個小時時,需要通知建立小時視窗的操作算子,以便操作算子可以關閉正在運作的視窗。

事件時間Event time可以獨立于處理時間processing time(由時鐘測量)進行。例如,在一個程式中,操作算子目前的事件時間event time可能稍微落後于處理時間processing time(考慮到接收事件的延遲),而兩者都以相同的速度進行。另一方面,另一個流程式通過快速轉發處理一些已經在Kafka主題(或另一個消息隊列)中緩沖數周的曆史資料,可能隻需要幾秒鐘就可以見這些曆史資料全部處理完成。

在Flink中,衡量事件時間進展的機制是水印watermarks。水印攜帶一個時間戳t,并作為資料流的一部分。一個Watermark(t)宣稱該流的事件時間已達到時間t,這意味着該流中不應該再出現有元素的時間戳t ' < = t【個人認為應該是t ' < t】的資料(即後面的事件的時間戳都應該大于或等于水印時間戳)。

下圖展示的是帶有(邏輯時間)時間戳和帶有内部流動水印的事件流。在這個例子中,事件是按順序排列的(基于它們的時間戳),這意味着水印隻是流中的周期性标記。

15.DataStream API之Event Time(Overview)

水印watermark對于無序流out-of-order非常重要,如下圖所示,在無序流中,事件不是按照它們的時間戳排序的。一般來說,水印是一種聲明,通過流中的該點,直到指定時間戳為止的所有事件都已經到達該點。水印到達操作算子後,操作算子可以使用内部事件時間event time時鐘來更新水印watermark的值。

15.DataStream API之Event Time(Overview)

注意,事件時間event time由新建立的流元素(或元素)繼承,這些流元素或來自生成它們的事件,或來自觸發這些元素建立的水印。

Watermarks in Parallel Streams并行流水印

水印在源函數source functions處生成,或者直接在源函數source functions之後生成。源函數source functions的每個并行子任務通常獨立地生成各自的水印。這些水印定義了該特定并行source的事件時間。

當水印流經流程式時,它們會更新所到之處的操作算子的事件時間。無論操作算子什麼時候更新它的事件時間,它會為它的後續操作算子生成一個新的水印。

一些操作算子使用多個輸入流;例如,在keyBy(…)或partition(…)函數之後的union或運算符。此類操作符的目前事件時間是其輸入流的事件時間的最小值。當它的輸入流更新它們的事件時間時,操作符也更新它們。

一些操作算子使用多個輸入流。union操作算子,例如:或者keyBy(…) 或 partition(…)函數之後的操作算子。這樣的操作算子的目前事件時間是它的輸入流的事件時間的最小值。當輸入流更新它們的事件時間,操作算子也會更新。

下圖顯示了一個通過并行流流動的事件和水印,以及操作算子跟蹤事件時間的示例。

15.DataStream API之Event Time(Overview)

注意Kafka source支援每個分區都維護着各自的水印,您可以在這裡here閱讀更多資訊。

Late Elements

某些元素可能會違反水印條件,這意味着即使Watermark(t)已經發生,也會出現更多具有時間戳 t ' <= t的元素。事實上,在實際應用中,某些元素可能被任意延遲,是以不可能指定某個事件時間戳的所有元素發生的時間。是以,延遲是有限制的,将水印延遲太多通常也是不可取的,因為這會導緻事件時間窗的計算延遲太多。

基于這個原因,流程式可能會明确地期望一些延遲的元素。延遲元素是在系統的事件時間時鐘(由水印watermarks發出信号)已經通過了延遲元素的時間戳之後到達的元素。有關如何在事件時間視窗中處理延遲元素的更多資訊,請參見“Lateness”。

Idling sources

目前,對于純事件時間水印生成器,如果沒有要處理的元素,水印的時間戳将無法繼續增長前進。這意味着,在輸入資料出現間隙的情況下,事件時間将不會繼續前進,例如視窗操作算子将不會被觸發,是以現有視窗将無法輸出任何資料。

為了避免這種情況,可以使用周期水印配置設定器periodic watermark assigners (實作該接口AssignerWithPeriodicWatermarks),它不僅基于元素時間戳進行指派。該問題的解決方案是通過使用一個配置設定器assigner ,該配置設定器assigner在一段時間沒有觀察到新事件産生後,切換到使用目前處理時間作為時間基礎的配置設定器。

源Sources可以标記為在空閑時使用SourceFunction.SourceContext#markAsTemporarilyIdle。有關詳細資訊,請參閱此方法的Javadoc以及StreamStatus。

Debugging Watermarks

有關運作時調試水印,請參閱調試視窗和事件時間 Debugging Windows & Event Time部分。

How operators are processing watermarks操作算子如何處理水印

一般來說,操作算子operators需要在水印發送到下遊之前對其進行處理。例如,WindowOperator将首先評估應該觸發哪個視窗,并且隻有在生成由水印觸發的所有輸出之後,水印本身才會被發送到下遊。換句話說,由于水印的出現而産生的所有元素都會在水印之前發出。

/**
 * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
 * {@link Trigger}.
 *
 * <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
 * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
 * is put into panes. A pane is the bucket of elements that have the same key and same
 * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
 * {@code WindowAssigner}.
 *
 * <p>Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
 * the contents of the pane should be processed to emit results. When a trigger fires,
 * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
 * the pane to which the {@code Trigger} belongs.
 *
 * @param <K> The type of key returned by the {@code KeySelector}.
 * @param <IN> The type of the incoming elements.
 * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
 * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
 */
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
   extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
   implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {...}
           

同樣的規則也适用于TwoInputStreamOperator。然而,在這種情況下,操作算子的目前水印被定義為其兩個輸入的最小值,(取兩個流中的最小水印時間戳---》Min(w(t1),w(t2)))。

/**
 * Interface for stream operators with two inputs. Use
 * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
 * you want to implement a custom operator.
 *
 * @param <IN1> The input type of the operator
 * @param <IN2> The input type of the operator
 * @param <OUT> The output type of the operator
 */
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {...}
           

此行為的詳細資訊由OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2方法的實作定義。

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

https://flink.sojb.cn/dev/event_time.html

https://www.jianshu.com/p/6727dfe0aaee 

繼續閱讀