天天看點

Flink 事件時間的陷進及解決思路

0x1 摘要

大家都知道Flink引入了事件時間(eventTime)這個重要概念,來提升資料統計的準确性,但引入事件時間後在具體業務實作時存在一些問題必需要合理去解決,否則會造成非常嚴重的問題。

0x2 Flink 時間概念介紹

Flink 支援不同的時間概念,包括:

  • Event Time :事件時間
  • Processing Time :處理時間
  • Ingestion Time :消息提取時間

參考下圖可以清晰的知道這三者的關系:

Flink 事件時間的陷進及解決思路

Ingestion Time

是介于

Event Time

Processing Time

之間的概念。

程式中可以通過

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

指定使用時間類型。

0x3 事件時間存在的問題

事件時間存在什麼樣的問題呢?下面先看一個簡單的業務場景。

比如:要統計APP上搜尋按鈕每1分鐘的點選次數。

前端埋點資料結構:

字段名 字段類型 描述
eventCode String 事件編碼
clickTime Long 點選時間

基于以上資料結構我們可設計如下水印處理器:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
  return eventTime;
 }
}           

extractTimestamp

方法會拿事件時間和上一次事件時間比較,并取較大值來更新目前水印值。

假設前端發送了以下這些資料,友善直覺看資料clickTime直接采用格式化後的值,并以逗号分隔資料。

001,2018-12-17 13:30:00
001,2018-12-17 13:30:01
001,2018-12-17 13:30:02
001,2018-12-18 13:30:00
001,2018-12-17 13:30:03
001,2018-12-17 13:30:04
001,2018-12-17 13:30:05           

正常資料都是17号,突然來了一條18号的資料,再結合上面的水印邏輯,一旦出現這種問題資料,直接導緻水位上升到18号,後面再來17号的資料全部無法處理。針對業務來講這樣的錯誤是緻命的,統計結果出現斷層。

0x4 解決思路

針對以上問題我們可以對水印實作類做如下改造:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
        }
  return eventTime;
 }
}           

MESSAGE_FORWARD_TIME

變量是自定義的消息最大跳躍時間,如果超出這個範圍則不更新最大水印時間。

繼續閱讀