Flink學習筆記-Timer&Watermark
-
- 事件時間
-
- API
- 攝入時間
-
- 處理時間
- 各類時間之間的關系
- 水位線
-
- WaterMark生成方式
-
- Source Function方式
- TimeStampAssigner方式
-
- PeriodicWatermarkAssigner
- PunctuatedWatermarkAssigner
- 總結
flink在計算過程中支援不同次元的時間概念,包含事件時間(Event Time),攝入時間(Ingestion Time),處理時間(Processing Time);
事件時間描述的是處理依賴的事件源産生事件的時間,也就是說在事件進入flink之前就已經以時間戳的方式嵌入到事件中了,其具有不可變的屬性,事件時間的優點在于可以讓我們還原整個事件生成過程中的先後順序,天然支援對事件産生順序敏感的業務場景。我們可以在flink中對事件時間進行抽取(具體抽取方式可以參考org.apache.flink.streaming.api.functions.TimestampAssigner或者org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks),然後與水位線(watermark)以及window操作結合起來處理亂序事件。
業務如果需要用到事件時間則需要顯示的在執行環境中指定:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
抽取事件時間的簡單示例(一般不會直接用TimestampAssigner)
該例中假設事件類型是String,且通過 ‘-’ 連接配接事件産生時間以及具體事件内容
public class MyTimestampAssigner implements TimestampAssigner<String>{
// 抽取事件時間
@Override
public long extractTimestamp(String s, long prevElementTimestamp) {
return generateTimestamp(s,prevElementTimestamp);
}
//act'
private long generateTimestamp(String s, long prevElementTimestamp) {
System.out.println( "last element timestamp is " + prevElementTimestamp);
return Long.parseLong(s.split("-")[0]);
}
}
攝入時間是指 flink從資料源處接收到事件的時間,即事件進入flink的時間;依賴于Source Operator所在主機的系統時鐘,其具有一定的可預見性,因為一旦資料接入,時間生成之後就不會發生改變,也就是不會被後續資料處理Operator所在機器的時鐘所幹擾(不會因為某台機器時鐘不同步或者網絡時延而導緻計算結果不準确)。但是,攝入時間無法處理亂序事件(很顯然,可能是在産生源或者消息中間件向flink注入事件的過程中,由于網絡,背壓等問題導緻亂序的産生),是以也就沒有必要為後續算子處理生成相應的水位線。
同樣滴, 配置攝入時間的方式(TimeCharacteristic.IngestionTime):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
處理時間是指資料在計算過程中擷取到的本地時間,即所在主機的時間;flink預設使用的就是Processing Time,指定處理時間後,所有和時間相關的計算算子,例如windows,在目前任務中所有算子将直接使用其所在機器的系統時間。使用Processing Time的程式相對性能會比較高,延時也會比較低,計算依賴的時間都會在具體算子運作的過程中通過本地時間産生,不需要做任務時間上的協調和統一。缺點表現為無法對亂序事件做出預期對事件時間有要求的正确處理,即使事件傳輸過程中不存在亂序,分布式環境下,每台機器的時間如果不同步,也可能導緻資料處理過程中資料亂序的問題。
一圖說明時間類型之間的關系:

event time --> Ingestion Time–>Processing Time
flink的WaterMark是用來衡量目前事件的到達情況,一般通過TimeStamps生成(生成方式通常是用TimeStamps【事件時間戳】減去可以配置的固定時間)。當watermark到達某一個時間點時,通常我們可以認為在該時間點之前的資料均已到達,否則認為沒有到達的事件為遲到事件或者稱為異常事件。WaterMark與eventTime以及涉及到到時間的operator結合起來使用可以在一定程度上解決事件亂序或者延遲的問題(out of order || late element),是以,WaterMark必須要在第一個時間相關的算子執行前生成。
flink提供了兩種生成WaterMark的方式,第一種是直接在Source Function中生成,第二種是通過Flink自帶的TimeStampAssigner指定TimeStamp以及生成WaterMark。
通過建立SourceFunction的匿名類,覆寫run方法,在方法内部使用flink傳遞給我們的SourceContext,通過調用SourceContext的collectWithTimestamp生成事件的事件時間,調用emitWatermark生成相應時間的水位線。
這種方式不常用,當我們使用外部資料源連接配接器時,基本上是無法使用Source Function這種方式的。
TimeStampAssigner的優先級是高于Source Function的,也就是說同時指定,Source Function會被覆寫。Flink内部根據TimeStampAssigner的不同實作方式将其分為兩種:第一種是PeriodicWatermarkAssigner,根據設定的時間間隔周期性生成Watermark,用的比較多,使用時實作接口org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks覆寫相應方法即可;另一種是PunctuatedWatermarkAssigner,根據接入資料的數量生成相應的Watermark,使用時實作接口org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks覆寫相應的方法即可。
Flink内部提供了兩種PeriodicWatermarkAssigner,第一種為升序模式的生成方式,即根據固定字段提取時間戳而且用最新的時間戳作為水位線;第二種為固定時延的生成方式,通過設定固定時間間隔來指定WaterMark落後于TimeStamp的區間長度。我們可以直接拿來使用:
Ascending
OutOfOrderness
想要采用不同的内置抽取器隻需要建立相應的内部實作,簡單友善,其中固定時延的内置Assigner構造函數需要傳入允許的最長時延,然後在覆寫extractTimestamp指定EventTime的時間戳。
使用者自定義Timestamp&WatermakerAssigner
為了支援更加廣泛的應用場景,必然的Flink支援自定義的PeriodicWatermarkAssigner,實作org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks即可。AssignerWithPeriodicWatermarks繼承自TimestampAssigner,也就是說AssignerWithPeriodicWatermarks集抽取時間戳與生成水印能力與一身。
Demo如下:
List<Tuple3<String, Integer, Long>> source = Lists.newArrayList();
source.add(new Tuple3<>("qingh1", 1, 100L));
source.add(new Tuple3<>("qingh2", 2, 101L));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple3<String, Integer, Long>> dataStreamSource = env.fromCollection(source);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10);
DataStream<Tuple3<String, Integer, Long>> result = dataStreamSource.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<Tuple3<String, Integer, Long>>() {
private Long maxOutOfOrderness = 100L;
private Long maxTimestamp = 0L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
// 獲取水位綫
return new Watermark(maxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long previousElementTimestamp) {
// 抽取時間戳
Long currentTimestamp = element.f2;
maxTimestamp = Math.max(currentTimestamp, maxTimestamp);
return currentTimestamp;
}
}
).keyBy(0).timeWindow(Time.milliseconds(10)).sum(1);
result.print();
env.execute("qinghh Demo");
new AssignerWithPunctuatedWatermarks<Tuple3<String, Integer, Long>>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, Integer, Long> lastElement, long extractedTimestamp) {
if (lastElement.f1 == 1){
return new Watermark(extractedTimestamp);
}
return null;
}
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long previousElementTimestamp) {
return element.f2;
}
}