0x1 摘要
大家都知道Flink引入了事件時間(eventTime)這個重要概念,來提升資料統計的準确性,但引入事件時間後在具體業務實作時存在一些問題必需要合理去解決,否則會造成非常嚴重的問題。
0x2 Flink 時間概念介紹
Flink 支援不同的時間概念,包括:
- Event Time :事件時間
- Processing Time :處理時間
- Ingestion Time :消息提取時間
參考下圖可以清晰的知道這三者的關系:
Ingestion Time
是介于
Event Time
和
Processing Time
之間的概念。
程式中可以通過
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
指定使用時間類型。
0x3 事件時間存在的問題
事件時間存在什麼樣的問題呢?下面先看一個簡單的業務場景。
比如:要統計APP上搜尋按鈕每1分鐘的點選次數。
前端埋點資料結構:
字段名 | 字段類型 | 描述 |
---|---|---|
eventCode | String | 事件編碼 |
clickTime | Long | 點選時間 |
基于以上資料結構我們可設計如下水印處理器:
public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
private long currentMaxTimestamp = 0L;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp -3000);
}
@Override
public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
long eventTime = tuple.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
return eventTime;
}
}
extractTimestamp
方法會拿事件時間和上一次事件時間比較,并取較大值來更新目前水印值。
假設前端發送了以下這些資料,友善直覺看資料clickTime直接采用格式化後的值,并以逗号分隔資料。
001,2018-12-17 13:30:00
001,2018-12-17 13:30:01
001,2018-12-17 13:30:02
001,2018-12-18 13:30:00
001,2018-12-17 13:30:03
001,2018-12-17 13:30:04
001,2018-12-17 13:30:05
正常資料都是17号,突然來了一條18号的資料,再結合上面的水印邏輯,一旦出現這種問題資料,直接導緻水位上升到18号,後面再來17号的資料全部無法處理。針對業務來講這樣的錯誤是緻命的,統計結果出現斷層。
0x4 解決思路
針對以上問題我們可以對水印實作類做如下改造:
public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
private long currentMaxTimestamp = 0L;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp -3000);
}
@Override
public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
long eventTime = tuple.f1;
if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
}
return eventTime;
}
}
MESSAGE_FORWARD_TIME
變量是自定義的消息最大跳躍時間,如果超出這個範圍則不更新最大水印時間。