flink 1.8
Generating Timestamps / Watermarks
本节主要介绍了基于事件时间event time的运行程序。有关事件时间 event time、处理时间processing time和摄入时间ingestion time的介绍,请参阅事件时间介绍introduction to event time。
为了处理事件时间event time,流程序需要设置相应地时间特征time characteristic。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Assigning Timestamps分配时间戳
为了处理事件时间event time,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过从元素中的某个字段中访问/提取时间戳来实现。
时间戳分配与生成水印关系密切,水印衡量系统事件时间event time的进展情况。
有两种方法来分配时间戳和生成水印:
- 直接在数据流源中
- 通过时间戳分配器/水印生成器:在Flink中,时间戳分配器定义了要发出的水印
注意,从1970-01-01T00:00:00Z是Java纪元开始,时间戳和水印都指定为毫秒。
Source Functions with Timestamps and Watermarks
sources流可以直接为它们生成的元素分配时间戳,还可以发出水印,这种应用场景下的程序是不需要时间戳分配器的。注意,如果使用时间戳分配器,则会覆盖sources提供的时间戳和水印。
要直接为源source中的元素分配时间戳,source必须使用SourceContext类中collectWithTimestamp(...) 方法。要生成水印,Source必须调用emitWatermark(Watermark)函数。
下面是一个源source分配时间戳和生成水印的(非检查点non-checkpointed)的简单示例:
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
public class SourceContextDemo implements SourceFunction {
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
@Override
public void cancel() {
}
}
Timestamp Assigners / Watermark Generators
时间戳分配器assigner获取一个流并生成一个带有时间戳的元素和水印的新流。如果原始流已经具有时间戳和/或水印,则时间戳分配者将覆盖它们。
时间戳分配器在数据源data sourc之后立即指定,但并不严格要求这样做。例如,一个常见的模式是在时间戳分配器之前解析(MapFunction)和过滤(FilterFunction)。无论在任何情况下,通常必须在有关事件时间event time的第一个操作算子之前指定时间戳分配器(例如第一个窗口操作window operation)(否则,窗口将无法被触发)。作为一种特殊情况,当使用Kafka作为流作业streaming job的source时,Flink允许在源source(或consumer消费者)内部指定时间戳分配器assigner/水印发射器watermark emitter。有关如何做到这一点的更多信息可以在Kafka连接器文档Kafka Connector documentation中找到。
注意:本节的其余部分介绍了程序员必须实现的主要接口,以便创建自己的时间戳提取器/水印发射器。要查看附带Flink的预实现提取器,请参阅Pre-defined Timestamp Extractors / Watermark Emitters。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
With Periodic Watermarks
AssignerWithPeriodicWatermarks分配时间戳并且周期性的生成水印(可能取决于流元素,或者纯粹基于处理时间)。
通过ExecutionConfig.setAutoWatermarkInterval(…)定义水印生成的间隔(每n毫秒)。每次都会调用assigner 的getCurrentWatermark()方法,如果水印不为空,且大于之前的水印,则会发出一个新的水印。
这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。注意,Flink包含一个BoundedOutOfOrdernessTimestampExtractor,类似于下面所示的BoundedOutOfOrdernessGenerator案例,您可以在这里here阅读相关内容。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
With Punctuated Watermarks
当使用一个特定的事件指出需要生成一个新的水印时,可以通过使用AssignerWithPunctuatedWatermarks类来实现。在这种情况下,Flink首先会调用extractTimestamp(…)方法来给元素分配一个时间戳,然后立即在该元素上执行checkAndGetNextWatermark(…)方法生产水印。
checkAndGetNextWatermark(…)方法传递extractTimestamp(…)方法中分配的时间戳,并可以决定是否要生成水印。每当checkAndGetNextWatermark(…)方法返回一个非空的水印,并且水印大于最新的水印时,就会发出新的水印。
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
注意:在每一个事件上都有可能产生水印。然而,由于每个水印都导致了一些下游的计算,过多的水印会降低性能。
Timestamps per Kafka Partition
当使用Apache Kafka作为数据源data source时,每个Kafka分区可能有一个简单的事件时间event time模式(升序的时间戳或无序有界)。然而,当从Kafka消费流时,多个分区经常被并行地消费,会交错的从分区中获取事件events,并且破坏了每个分区(per-partition)的模式(这是Kafka的消费者客户端工作的固有特性)。
在这种情况下,你可以使用Flink的Kafka-partition-aware方法生成水印。使用该特性,每个Kafka分区都会在Kafka消费者内部生成水印,并且每个分区的水印合并方法,与在流shuffles中合并水印的方法相同。
例如,如果事件时间戳严格按照Kafka分区升序,那么使用升序时间戳水印生成器(ascending timestamps watermark generator)生成每个分区的水印将得到完美的整体水印。
下图展示了如何使用每kafka分区生成水印,以及这种情况下水印如何在流数据流中传递。
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
总结:
Assigning Timestamps:
(1)如果使用了时间戳分配器,则数据源中的任何时间戳和水印都将被覆盖;
(2)时间戳必须在第一个窗口算子之前指定;
(3)作为一种特殊情况,当使用Kafka作为数据源时,Flink允许在kafka源(或消费者)内部指定一个时间戳分配器/水印发射器。
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
这种分式很少使用。
常用的两种自定义事件时间戳分配器和水印发射器的对比:
AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks:
AssignerWithPeriodicWatermarks更常用,而且当数据量很大时,效率更高;AssignerWithPunctuatedWatermarks一般适合TPS小于100的场景。
代码:实现AssignerWithPeriodicWatermarks接口:
public class OmsDetailWatermarkAssigner implements AssignerWithPeriodicWatermarks<JSONObject> {
private static final Logger LOG = LoggerFactory.getLogger(OmsDetailWatermarkAssigner.class);
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Long currentMaxTimestamp = 0l;
//水印延迟时间
private static final Long DELAY_TIME = 0 * 1000l;
private Watermark watermark = null;
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp);
LOG.error("getCurrentWatermark:" + currentMaxTimestamp + " watermark: "+watermark.toString());
return watermark;
}
@Override
public long extractTimestamp(JSONObject element, long l) {
try {
String etlTime = element.getString("etl_time");
long etlTimestamp = sdf.parse(etlTime).getTime();
currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp) - DELAY_TIME;
LOG.error("OmsDetailWatermarkAssigner:" + l + " watermark: "+watermark.toString());
return etlTimestamp;
} catch (ParseException e) {
LOG.error("the oms detatil orderTime parse fail! " + element.toString(), e);
}
return 0l;
}
}
getCurrentWatermark()方法先执行----》在执行extractTimestamp(JSONObject element, long l)方法,因此第一个元素的的水印时间是:0,第二个元素的水印时间时上个元素的event时间-延迟时间(currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp) - DELAY_TIME)
生成水印的时间间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)定义的。每次都会调用assigner的getCurrentWatermark()方法,如果返回的水印是非空且比前一个水印大,则会发出一个新的水印。
实现AssignerWithPunctuatedWatermarks接口:
public class OmsDetailWatermarkAssigner implements AssignerWithPeriodicWatermarks<JSONObject> {
private static final Logger LOG = LoggerFactory.getLogger(OmsDetailWatermarkAssigner.class);
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Long currentMaxTimestamp = 0l;
//水印延迟时间
private static final Long DELAY_TIME = 0 * 1000l;
/**
* 再执行该函数,watermarkTimestamp的值是方法extractTimestamp()的返回值
*
* @param lastElement 数据流元素
* @param watermarkTimestamp
* @return
*/
@Nullable
@Override
public Watermark checkAndGetNextWatermark(JSONObject lastElement, long watermarkTimestamp) {
LOG.error("currentMaxTimestamp:" + currentMaxTimestamp);
return new Watermark(currentMaxTimestamp);
}
/**
* 先执行该函数,从element中提取时间戳
*
* @param element 数据流元素
* @param currentTimestamp 当前的系统时间
* @return 数据的事件时间戳,触发器Trigger中的时间也是这个返回值
*/
@Override
public long extractTimestamp(JSONObject element, long currentTimestamp) {
LOG.error("OmsDetailWatermarkAssigner:" + currentTimestamp);
try {
String etlTime = element.getString("etl_time");
long etlTimestamp = sdf.parse(etlTime).getTime();
currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp) - DELAY_TIME;
return etlTimestamp;
} catch (ParseException e) {
LOG.error("the oms detatil orderTime parse fail! " + element.toString(), e);
}
return 0l;
}
}
先执行extractTimestamp(JSONObject element, long currentTimestamp)方法----》在执行checkAndGetNextWatermark(JSONObject lastElement, long watermarkTimestamp)方法
方法,因此第一个元素的的水印时间是:-Long.Max=-9223372036854775808,第二个元素的水印时间时上个元素的event时间-延迟时间(currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp) - DELAY_TIME)
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html
https://flink.sojb.cn/dev/event_timestamps_watermarks.html
https://www.jianshu.com/p/92a21b8004af