天天看點

Flink源碼閱讀之AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的差別

任務送出和任務執行分别參考後面文章。

Flink源碼閱讀之AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的差別

從OneInputStreamTask入口,init()方法會初始化StreamInputProcessor對象,

public void init() throws Exception {
   StreamConfig configuration = getConfiguration();

   TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
   int numberOfInputs = configuration.getNumberOfInputs();

   if (numberOfInputs > 0) {
      InputGate[] inputGates = getEnvironment().getAllInputGates();

      inputProcessor = new StreamInputProcessor<>(
            inputGates,
            inSerializer,
            this,
            configuration.getCheckpointMode(),
            getCheckpointLock(),
            getEnvironment().getIOManager(),
            getEnvironment().getTaskManagerInfo().getConfiguration(),
            getStreamStatusMaintainer(),
            this.headOperator,
            getEnvironment().getMetricGroup().getIOMetricGroup(),
            inputWatermarkGauge);
   }
   headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
   getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
           

run方法中會調用StreamInputProcessor的processInput方法,

protected void run() throws Exception {
   // cache processor reference on the stack, to make the code more JIT friendly
   final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

   while (running && inputProcessor.processInput()) {
      // all the work happens in the "processInput" method
   }
}


public boolean processInput() throws Exception {
   if (isFinished) {
      return false;
   }
   if (numRecordsIn == null) {
      try {
         numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
      } catch (Exception e) {
         LOG.warn("An exception occurred during the metrics setup.", e);
         numRecordsIn = new SimpleCounter();
      }
   }

   while (true) {
      if (currentRecordDeserializer != null) {
         DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

         if (result.isBufferConsumed()) {
            currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
            currentRecordDeserializer = null;
         }

         if (result.isFullRecord()) {
            StreamElement recordOrMark = deserializationDelegate.getInstance();

            if (recordOrMark.isWatermark()) {
               // handle watermark 處理watermark
               statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
               continue;
            } else if (recordOrMark.isStreamStatus()) {
               // handle stream status
               statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
               continue;
            } else if (recordOrMark.isLatencyMarker()) {
               // handle latency marker
               synchronized (lock) {
                  streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
               }
               continue;
            } else {
               // now we can do the actual processing
               //  到這開始處理正常資料
               StreamRecord<IN> record = recordOrMark.asRecord();
               synchronized (lock) {
                  numRecordsIn.inc();
                  streamOperator.setKeyContextElement1(record);
                  streamOperator.processElement(record);
               }
               return true;
            }
         }
      }

      final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
      if (bufferOrEvent != null) {
         if (bufferOrEvent.isBuffer()) {
            currentChannel = bufferOrEvent.getChannelIndex();
            currentRecordDeserializer = recordDeserializers[currentChannel];
            currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
         }
         else {
            // Event received
            final AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
               throw new IOException("Unexpected event: " + event);
            }
         }
      }
      else {
         isFinished = true;
         if (!barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
         }
         return false;
      }
   }
}
           

接下來調用OneInputStreamOperator的processElement方法,實作類如下

Flink源碼閱讀之AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的差別

常用的是TimestampsAndPunctuatedWatermarksOperator和TimestampsAndPeriodicWatermarksOperator

先看下TimestampsAndPunctuatedWatermarksOperator

public void processElement(StreamRecord<T> element) throws Exception {
   final T value = element.getValue();
   final long newTimestamp = userFunction.extractTimestamp(value,
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));

   final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
   if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
      currentWatermark = nextWatermark.getTimestamp();
      output.emitWatermark(nextWatermark);
   }
}
           

先調用使用者自定義的extractTimestamp方法擷取時間戳,先判斷element有沒有時間戳,沒有則傳入Long.MIN_VALUE

@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
    System.out.println("=======WatermarkAssigner:" + currentTimestamp);
    try {
        long etlTime = element.f2;
        currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
        return etlTime;
    } catch (Exception e) {
        LOG.error("the  orderTime parse fail! " + element.toString(), e);
    }
    return 0l;
}
           

将元素的timestamp指派為eventTime,

element.replace(element.getValue(), newTimestamp)

然後調用自定義的checkAndGetNextWatermark擷取下一個時間戳,将newTimestamp傳入

@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
    System.out.println("========currentMaxTimestamp:" + currentMaxTimestamp);
    return new Watermark(currentMaxTimestamp - DELAY_TIME);
}
           

是以第一個watermark時間就是第一個元素的eventTime-DELAY_TIME,從第二個開始Math.max(eventTime, currentMaxTimestamp)-DELAY_TIME,調用順序是先extractTimestamp後

checkAndGetNextWatermark。每個元素調用一次。

測試代碼

public class WaterMarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);
        env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner()).print();
        env.execute();
    }
}

public class PunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(PunctuatedWatermarkAssigner.class);
    private Long currentMaxTimestamp = 0l;
    //水印延遲時間
    private static final Long DELAY_TIME = 3 * 1000l;

    /**
     * 再執行該函數,watermarkTimestamp的值是方法extractTimestamp()的傳回值
     *
     * @param lastElement        資料流元素
     * @param watermarkTimestamp
     * @return
     */
    @Override
    public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
        System.out.println("========currentwatermark:" + (currentMaxTimestamp-DELAY_TIME));
        return new Watermark(currentMaxTimestamp - DELAY_TIME);
    }

    /**
     * 先執行該函數,從element中提取時間戳
     *
     * @param element          資料流元素
     * @param currentTimestamp 目前的系統時間
     * @return 資料的事件時間戳,觸發器Trigger中的時間也是這個傳回值
     */
    @Override
    public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
        System.out.println("=======WatermarkAssigner:" + currentTimestamp);
        try {
            long etlTime = element.f2;
            currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
            return etlTime;
        } catch (Exception e) {
            LOG.error("the  orderTime parse fail! " + element.toString(), e);
        }
        return 0l;
    }
}
           

結果

=======WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
========currentwatermark:1551169105000
           

再看下TimestampsAndPeriodicWatermarksOperator

@Override
public void processElement(StreamRecord<T> element) throws Exception {
   final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}
           

隻調用了extractTimestamp,沒有調用getCurrentWatermark

同樣是先判斷目前element有沒有timestamp,如果沒有則給Long.MIN_VALUE傳入extractTimestamp,将傳回的eventTime作為時間戳。沒有調用getCurrentWatermark方法是因為周期性生成watermark

測試代碼

public class WaterMarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(200L);//200ms生成一次watermark
        env.setParallelism(1);
        env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PeriodicWatermarkAssigner()).print();
        env.execute();
    }
}

public class PeriodicWatermarkAssigner implements AssignerWithPeriodicWatermarks<Tuple3<String, String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(PeriodicWatermarkAssigner.class);
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private Long currentMaxTimestamp = 0l;
    //水印延遲時間
    private static final Long DELAY_TIME = 0 * 1000l;
    private Watermark watermark = null;

    @Override
    public Watermark getCurrentWatermark() {
        watermark = new Watermark(currentMaxTimestamp - DELAY_TIME);
        System.out.println("currentMaxTimestamp:" + currentMaxTimestamp + " watermark: " + watermark.getTimestamp());
        return watermark;
    }

    @Override
    public long extractTimestamp(Tuple3<String, String, Long> element, long l) {
        try {
            System.out.println("WatermarkAssigner:" + l);
            long etlTimestamp = element.f2;
            currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp);
            return etlTimestamp;
        } catch (Exception e) {
            LOG.error("the orderTime parse fail! " + element.toString(), e);
        }
        return 0l;
    }
}
           

結果

WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
           

為何element本來都沒有timestamp,都是long的最小值?

那生産實際運作的程式驗證一下,結果也是如此

====previousElementTimestamp:-9223372036854775808
orderid =004247789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247sss789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247aaas789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
           

那麼什麼情況下element會有timestamp?

隻有在以TimeCharacteristic.IngestionTime進行處理時則element會被打上進入系統的時間

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
           

設定後結果如下:

=======WatermarkAssigner:1566899708826
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:1566899709850
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899710851
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899711851
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:1566899712851
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899713855
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899714855
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899715855
(b,6,1551169108000)
========currentwatermark:1551169105000
           

總結:

AssignerWithPeriodicWatermarks 周期性的生成watermark,生成間隔可配置,根據資料的eventTime來更新watermark時間

AssignerWithPunctuatedWatermarks 不會周期性生成watermark,隻根據元素

的eventTime來更新watermark。

當用EventTime和ProcessTime來計算時,元素本身都是不帶時間戳的,隻有以IngestionTime計算時才會打上進入系統的時間戳。

繼續閱讀