天天看点

Flink - Generating Timestamps / Watermarks

<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_timestamps_watermarks.html">https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_timestamps_watermarks.html</a>

to work with event time, streaming programs need to set the time characteristic accordingly.

首先配置成,event time

in order to work with event time, flink needs to know the events’ timestamps, meaning each element in the stream needs to get its event timestamp assigned. that happens usually by accessing/extracting the timestamp from some field in the element.

timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about the progress in event time.

there are two ways to assign timestamps and generate watermarks:

directly in the data stream source

via a timestampassigner / watermarkgenerator

接着,我们需要定义如何去获取event time和如何产生watermark?

一种方式,在source中写死,

Flink - Generating Timestamps / Watermarks
Flink - Generating Timestamps / Watermarks

这种方式明显比较low,不太方便,并且这种方式是会被timestampassigner 覆盖掉的,

所以看看第二种方式,

timestamp assigners / watermark generators

timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. if the original stream had timestamps or watermarks already, the timestamp assigner overwrites those.

the timestamp assigners occur usually direct after the data source, but it is not strictly required to. a common pattern is for example to parse (mapfunction) and filter (filterfunction) before the timestamp assigner. in any case, the timestamp assigner needs to occur before the first operation on event time (such as the first window operation).

一般在会在source后加些map,filter做些初始化或格式化

然后,在任意需要用到event time的操作之前,比如window,进行设置

给个例子,

Flink - Generating Timestamps / Watermarks
Flink - Generating Timestamps / Watermarks

那么timestamp assigners如何实现,比如例子中给出的mytimestampsandwatermarks

有3种,

with ascending timestamps

the simplest case for generating watermarks is the case where timestamps within one source occur in ascending order. in that case, the current timestamp can always act as a watermark, because no lower timestamps will occur any more.

Flink - Generating Timestamps / Watermarks
Flink - Generating Timestamps / Watermarks

这种没人用吧,不如直接用processing time了

with periodic watermarks

the <code>assignerwithperiodicwatermarks</code> assigns timestamps and generate watermarks periodically (possibly depending the stream elements, or purely based on processing time).

the interval (every n milliseconds) in which the watermark will be generated is defined via<code>executionconfig.setautowatermarkinterval(...)</code>. each time, the assigner’s <code>getcurrentwatermark()</code> method will be called, and a new watermark will be emitted, if the returned watermark is non-null and larger than the previous watermark.

定期的发送,你可以通过<code>executionconfig.setautowatermarkinterval(...),来设置这个频率</code>

Flink - Generating Timestamps / Watermarks
Flink - Generating Timestamps / Watermarks

上面给出两个case,区别是第一种,会以event time的max,来设置watermark

第二种,是以当前的processing time来设置watermark

with punctuated watermarks

to generate watermarks whenever a certain event indicates that a new watermark can be generated, use the<code>assignerwithpunctuatedwatermarks</code>. for this class, flink will first call the <code>extracttimestamp(...)</code> method to assign the element a timestamp, and then immediately call for that element the <code>checkandgetnextwatermark(...)</code> method.

the <code>checkandgetnextwatermark(...)</code> method gets the timestamp that was assigned in the <code>extracttimestamp(...)</code> method, and can decide whether it wants to generate a watermark. whenever the <code>checkandgetnextwatermark(...)</code> method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark will be emitted.

这种即,watermark不是由时间来触发的,而是以特定的event触发的,即本到某些特殊的event或message,才触发watermark

所以它的接口叫,<code>checkandgetnextwatermark</code>

需要先check

Flink - Generating Timestamps / Watermarks
Flink - Generating Timestamps / Watermarks

继续阅读