天天看點

[Flink]Flink1.3 Stream指南五 視窗觸發器與驅逐器

1. 視窗觸發器 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E4%BA%94%20%E7%AA%97%E5%8F%A3%E8%A7%A6%E5%8F%91%E5%99%A8%E4%B8%8E%E9%A9%B1%E9%80%90%E5%99%A8.md#1

觸發器(Trigger)确定視窗(由

視窗配置設定器

形成)何時準備好被

視窗函數

處理。每個視窗配置設定器都帶有預設觸發器。如果預設觸發器不滿足你的要求,可以使用

trigger(...)

指定自定義觸發器。

觸發器接口有五種方法允許觸發器對不同的事件做出反應:

public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

public void onMerge(W window, OnMergeContext ctx) throws Exception {
    throw new UnsupportedOperationException("This trigger does not support merging.");
}

public abstract void clear(W window, TriggerContext ctx) throws Exception;
           

(1) onElement()方法,當每個元素被添加視窗時調用。

(2) onEventTime()方法,當注冊的事件時間計時器(event-time timer)觸發時調用。

(3) onProcessingTime()方法,當注冊的處理時間計時器(processing-time timer)觸發時調用。

(4) onMerge()方法,與狀态觸發器相關,并且在相應的視窗合并時合并兩個觸發器的狀态。例如,使用會話視窗時。

(5) clear()方法,執行删除相應視窗所需的任何操作(performs any action needed upon removal of the corresponding window)。

以上方法有兩件事要注意:

(1) 前三個函數決定了如何通過傳回一個

TriggerResult

來對其調用事件采取行動。

TriggerResult

可以是以下之一:

public enum TriggerResult {
  // 什麼都不做
    CONTINUE(false, false),

  // 觸發計算,然後清除視窗中的元素
    FIRE_AND_PURGE(true, true),

  // 觸發計算
    FIRE(true, false),

  // 清除視窗中的元素
    PURGE(false, true);
}
           

(2) 上面任何方法都可以用于注冊處理時間計時器或事件時間計時器以供将來的操作使用。

1.1 觸發與清除 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E4%BA%94%20%E7%AA%97%E5%8F%A3%E8%A7%A6%E5%8F%91%E5%99%A8%E4%B8%8E%E9%A9%B1%E9%80%90%E5%99%A8.md#1-1

一旦觸發器确定視窗準備好處理資料,它将觸發,例如,它傳回

FIRE

FIRE_AND_PURGE

。這是視窗算子給目前視窗發送結果的信号。給定一個帶有

WindowFunction

的視窗,所有的元素都被傳遞給

WindowFunction

(可能在将所有元素傳遞給evictor之後)。帶有

ReduceFunction

或者

FoldFunction

的視窗隻是簡單地發出他們急切希望得到的聚合結果(Windows with ReduceFunction of FoldFunction simply emit their eagerly aggregated result.)。

觸發器觸發時,可以是

FIRE

FIRE_AND_PURGE

。當是

FIRE

時保留視窗的内容,當時

FIRE_AND_PURGE

時會删除其内容。預設情況下,内置的觸發器隻傳回

FIRE

,不會清除視窗狀态(the pre-implemented triggers simply FIRE without purging the window state)。

備注:

清除隻是簡單地删除視窗的内容,并留下關于視窗和任何觸發狀态的任何潛在元資訊。
           

1.2 視窗配置設定器的預設觸發器 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E4%BA%94%20%E7%AA%97%E5%8F%A3%E8%A7%A6%E5%8F%91%E5%99%A8%E4%B8%8E%E9%A9%B1%E9%80%90%E5%99%A8.md#1-2

視窗配置設定器的預設觸發器适用于許多情況。例如,所有的事件時間視窗配置設定器都有一個

EventTimeTrigger

作為預設觸發器。一旦watermark到達視窗的末尾,這個觸發器就會被觸發。

全局視窗(GlobalWindow)的預設觸發器是永不觸發的NeverTrigger。是以,在使用全局視窗時,必須自定義一個觸發器。
           
通過使用trigger()方法指定觸發器,将會覆寫視窗配置設定器的預設觸發器。例如,如果你為TumblingEventTimeWindows指定CountTrigger,則不會再根據時間的進度啟動視窗函數,而隻能通過計數。現在,如果你希望基于時間和個數進行觸發,則必須編寫自己的自定義觸發器。
           

1.3 内置觸發器和自定義觸發器 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E4%BA%94%20%E7%AA%97%E5%8F%A3%E8%A7%A6%E5%8F%91%E5%99%A8%E4%B8%8E%E9%A9%B1%E9%80%90%E5%99%A8.md#1-3

Flink帶有一些内置觸發器:

(1) 

EventTimeTrigger

,根據watermarks測量的事件時間進度觸發。

(2) 

ProcessingTimeTrigger

,基于處理時間觸發。

(3) 

CountTrigger

,一旦視窗中的元素數量超過給定限制就會觸發。

(4) 

PurgingTrigger

将其作為另一個觸發器的參數,并将其轉換為帶有清除功能(transforms it into a purging one)。

如果需要實作一個自定義的觸發器,你應該看看

Trigger

抽象類。請注意,API仍在發展中,在Flink未來版本中可能會發生改變。

2. 視窗驅逐器 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E4%BA%94%20%E7%AA%97%E5%8F%A3%E8%A7%A6%E5%8F%91%E5%99%A8%E4%B8%8E%E9%A9%B1%E9%80%90%E5%99%A8.md#2

Flink的視窗模型允許在視窗配置設定器和觸發器之外指定一個可選的驅逐器(Evictor)。這可以使用

evictor(...)

方法來完成。驅逐器能夠在觸發器觸發之後,在應用視窗函數之前或之後從視窗中移除元素,也可以之前之後都删除元素( The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied.)。為此,Evictor接口有兩種方法:

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
 void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
           

evictBefore()

包含驅逐邏輯,在視窗函數之前應用。而

evictAfter()

在視窗函數之後應用。在應用視窗函數之前被逐出的元素将不被處理。

Flink帶有三個内置的驅逐器:

(1) CountEvictor:保持視窗内使用者指定數量的元素,如果多于使用者指定的數量,從視窗緩沖區的開頭丢棄剩餘的元素。

(2) DeltaEvictor:使用

DeltaFunction

和門檻值,計算視窗緩沖區中的最後一個元素與其餘每個元素之間的delta值,并删除delta值大于或等于門檻值的元素(computes the delta between the last element in the window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold)。

(3) TimeEvictor:以毫秒為機關的時間間隔作為參數,對于給定的視窗,找到元素中的最大的時間戳

max_ts

,并删除時間戳小于

max_ts - interval

的所有元素。

預設情況下,所有内置的驅逐器在視窗函數之前應用。
           
指定驅逐器可阻止任何的預聚合(pre-aggregation),因為視窗的所有元素必須在應用計算之前傳遞給驅逐器。
           
Flink不保證視窗内元素的順序。 這意味着雖然驅逐者可以從視窗的開頭移除元素,但這些元素不一定是先到的還是後到的。
           
Flink版本:1.3
           

原文: 

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers

繼續閱讀