任務送出和任務執行分别參考後面文章。
Flink源碼閱讀之AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的差別
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPn5kMBR0T3lFVNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwgTN3IzN1kDM4IDOwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
從OneInputStreamTask入口,init()方法會初始化StreamInputProcessor對象,
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
run方法中會調用StreamInputProcessor的processInput方法,
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark 處理watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
// 到這開始處理正常資料
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
接下來調用OneInputStreamOperator的processElement方法,實作類如下
常用的是TimestampsAndPunctuatedWatermarksOperator和TimestampsAndPeriodicWatermarksOperator
先看下TimestampsAndPunctuatedWatermarksOperator
public void processElement(StreamRecord<T> element) throws Exception {
final T value = element.getValue();
final long newTimestamp = userFunction.extractTimestamp(value,
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
currentWatermark = nextWatermark.getTimestamp();
output.emitWatermark(nextWatermark);
}
}
先調用使用者自定義的extractTimestamp方法擷取時間戳,先判斷element有沒有時間戳,沒有則傳入Long.MIN_VALUE
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
System.out.println("=======WatermarkAssigner:" + currentTimestamp);
try {
long etlTime = element.f2;
currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
return etlTime;
} catch (Exception e) {
LOG.error("the orderTime parse fail! " + element.toString(), e);
}
return 0l;
}
将元素的timestamp指派為eventTime,
element.replace(element.getValue(), newTimestamp)
然後調用自定義的checkAndGetNextWatermark擷取下一個時間戳,将newTimestamp傳入
@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
System.out.println("========currentMaxTimestamp:" + currentMaxTimestamp);
return new Watermark(currentMaxTimestamp - DELAY_TIME);
}
是以第一個watermark時間就是第一個元素的eventTime-DELAY_TIME,從第二個開始Math.max(eventTime, currentMaxTimestamp)-DELAY_TIME,調用順序是先extractTimestamp後
checkAndGetNextWatermark。每個元素調用一次。
測試代碼
public class WaterMarkTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner()).print();
env.execute();
}
}
public class PunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>> {
private static final Logger LOG = LoggerFactory.getLogger(PunctuatedWatermarkAssigner.class);
private Long currentMaxTimestamp = 0l;
//水印延遲時間
private static final Long DELAY_TIME = 3 * 1000l;
/**
* 再執行該函數,watermarkTimestamp的值是方法extractTimestamp()的傳回值
*
* @param lastElement 資料流元素
* @param watermarkTimestamp
* @return
*/
@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
System.out.println("========currentwatermark:" + (currentMaxTimestamp-DELAY_TIME));
return new Watermark(currentMaxTimestamp - DELAY_TIME);
}
/**
* 先執行該函數,從element中提取時間戳
*
* @param element 資料流元素
* @param currentTimestamp 目前的系統時間
* @return 資料的事件時間戳,觸發器Trigger中的時間也是這個傳回值
*/
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
System.out.println("=======WatermarkAssigner:" + currentTimestamp);
try {
long etlTime = element.f2;
currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
return etlTime;
} catch (Exception e) {
LOG.error("the orderTime parse fail! " + element.toString(), e);
}
return 0l;
}
}
結果
=======WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
========currentwatermark:1551169105000
再看下TimestampsAndPeriodicWatermarksOperator
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
}
隻調用了extractTimestamp,沒有調用getCurrentWatermark
同樣是先判斷目前element有沒有timestamp,如果沒有則給Long.MIN_VALUE傳入extractTimestamp,将傳回的eventTime作為時間戳。沒有調用getCurrentWatermark方法是因為周期性生成watermark
測試代碼
public class WaterMarkTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(200L);//200ms生成一次watermark
env.setParallelism(1);
env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PeriodicWatermarkAssigner()).print();
env.execute();
}
}
public class PeriodicWatermarkAssigner implements AssignerWithPeriodicWatermarks<Tuple3<String, String, Long>> {
private static final Logger LOG = LoggerFactory.getLogger(PeriodicWatermarkAssigner.class);
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Long currentMaxTimestamp = 0l;
//水印延遲時間
private static final Long DELAY_TIME = 0 * 1000l;
private Watermark watermark = null;
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - DELAY_TIME);
System.out.println("currentMaxTimestamp:" + currentMaxTimestamp + " watermark: " + watermark.getTimestamp());
return watermark;
}
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long l) {
try {
System.out.println("WatermarkAssigner:" + l);
long etlTimestamp = element.f2;
currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp);
return etlTimestamp;
} catch (Exception e) {
LOG.error("the orderTime parse fail! " + element.toString(), e);
}
return 0l;
}
}
結果
WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
為何element本來都沒有timestamp,都是long的最小值?
那生産實際運作的程式驗證一下,結果也是如此
====previousElementTimestamp:-9223372036854775808
orderid =004247789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247sss789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247aaas789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
那麼什麼情況下element會有timestamp?
隻有在以TimeCharacteristic.IngestionTime進行處理時則element會被打上進入系統的時間
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
設定後結果如下:
=======WatermarkAssigner:1566899708826
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:1566899709850
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899710851
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899711851
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:1566899712851
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899713855
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899714855
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899715855
(b,6,1551169108000)
========currentwatermark:1551169105000
總結:
AssignerWithPeriodicWatermarks 周期性的生成watermark,生成間隔可配置,根據資料的eventTime來更新watermark時間
AssignerWithPunctuatedWatermarks 不會周期性生成watermark,隻根據元素
的eventTime來更新watermark。
當用EventTime和ProcessTime來計算時,元素本身都是不帶時間戳的,隻有以IngestionTime計算時才會打上進入系統的時間戳。