天天看点

Flink 原理与实现:Window 机制

flink 认为 batch 是 streaming 的一个特例,所以 flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 streaming 到 batch 的一个桥梁。flink 提供了非常完善的窗口机制,这是我认为的 flink 最大的亮点之一(其他的亮点包括消息乱序处理,和 checkpoint 机制)。本文我们将介绍流式处理中的窗口概念,介绍 flink 内建的一些窗口和 window api,最后讨论下窗口在底层是如何实现的。

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

窗口可以是时间驱动的(time window,例如:每30秒钟),也可以是数据驱动的(count window,例如:每一百个元素)。一种经典的窗口分类可以分成:翻滚窗口(tumbling window,无重叠),滚动窗口(sliding window,有重叠),和会话窗口(session window,活动间隙)。

我们举个具体的场景来形象地理解不同窗口的概念。假设,淘宝网会记录每个用户每次购买的商品个数,我们要做的是统计不同窗口中用户购买商品的总数。下图给出了几种经典的窗口切分概述图:

Flink 原理与实现:Window 机制

上图中,raw data stream 代表用户的购买行为流,圈中的数字代表该用户本次购买的商品个数,事件是按时间分布的,所以可以看出事件之间是有time gap的。flink 提供了上图中所有的窗口类型,下面我们会逐一进行介绍。

tumbling time window

如上图,我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(tumbling time window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。通过使用 datastream api,我们可以这样实现:

sliding time window

但是对于某些应用,它们需要的窗口是不间断的,需要平滑地进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(sliding time window)。在滑窗中,一个元素可以对应多个窗口。通过使用 datastream api,我们可以这样实现:

count window 是根据元素个数对数据流进行分组的。

tumbling count window

当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(tumbling count window),上图所示窗口大小为3个。通过使用 datastream api,我们可以这样实现:

sliding count window

当然count window 也支持 sliding window,虽在上图中未描述出来,但和sliding time window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。

在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。session window 的示例代码如下:

一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。flink 的 datastream api 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。下面我们会对 flink 窗口相关的 api 进行剖析。

得益于 flink window api 松耦合设计,我们可以非常灵活地定义符合特定业务的窗口。flink 中定义一个窗口主要需要以下三个组件。

window assigner:用来决定某个元素被分配到哪个/哪些窗口中去。

trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的trigger。

evictor:可以译为“驱逐者”。在trigger触发之后,在窗口被处理之前,evictor(如果有evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。

上述三个组件的不同实现的不同组合,可以定义出非常复杂的窗口。flink 中内置的窗口也都是基于这三个组件构成的,当然内置窗口有时候无法解决用户特殊的需求,所以 flink 也暴露了这些窗口机制的内部接口供用户实现自定义的窗口。下面我们将基于这三者探讨窗口的实现机制。

下图描述了 flink 的窗口机制以及各组件之间是如何相互工作的。

Flink 原理与实现:Window 机制

每一个窗口都拥有一个属于自己的 trigger,trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入到该窗口,或者之前注册的定时器超时了,那么trigger都会被调用。trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

当trigger fire了,窗口中的元素集合就会交给<code>evictor</code>(如果指定了的话)。evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 evictor 的话,窗口中的所有元素会一起交给函数进行计算。

计算函数收到了窗口的元素(可能经过了 evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口的结果值可以是一个也可以是多个。datastream api 上可以接收不同类型的计算函数,包括预定义的<code>sum()</code>,<code>min()</code>,<code>max()</code>,还有 <code>reducefunction</code>,<code>foldfunction</code>,还有<code>windowfunction</code>。windowfunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

flink 对于一些聚合类的窗口计算(如sum,min)做了优化,因为聚合类的计算不需要将窗口中的所有数据都保存下来,只需要保存一个result值就可以了。每个进入窗口的元素都会执行一次聚合函数并修改result值。这样可以大大降低内存的消耗并提升性能。但是如果用户定义了 evictor,则不会启用对聚合窗口的优化,因为 evictor 需要遍历窗口中的所有元素,必须要将窗口中所有元素都存下来。

上述的三个组件构成了 flink 的窗口机制。为了更清楚地描述窗口机制,以及解开一些疑惑(比如 purge 和 evictor 的区别和用途),我们将一步步地解释 flink 内置的一些窗口(time window,count window,session window)是如何实现的。

count window 是使用三组件的典范,我们可以在 <code>keyedstream</code> 上创建 count window,其源码如下所示:

第一个函数是申请翻滚计数窗口,参数为窗口大小。第二个函数是申请滑动计数窗口,参数分别为窗口大小和滑动大小。它们都是基于 <code>globalwindows</code> 这个 windowassigner 来创建的窗口,该assigner会将所有元素都分配到同一个global window中,所有<code>globalwindows</code>的返回值一直是 <code>globalwindow</code> 单例。基本上自定义的窗口都会基于该assigner实现。

翻滚计数窗口并不带evictor,只注册了一个trigger。该trigger是带purge功能的 counttrigger。也就是说每当窗口中的元素数量达到了 window-size,trigger就会返回fire+purge,窗口就会执行计算并清空窗口中的所有元素,再接着储备新的元素。从而实现了tumbling的窗口之间无重叠。

滑动计数窗口的各窗口之间是有重叠的,但我们用的 globalwindows assinger 从始至终只有一个窗口,不像 sliding time assigner 可以同时存在多个窗口。所以trigger结果不能带purge,也就是说计算完窗口后窗口中的数据要保留下来(供下个滑窗使用)。另外,trigger的间隔是slide-size,evictor的保留的元素个数是window-size。也就是说,每个滑动间隔就触发一次窗口计算,并保留下最新进入窗口的window-size个元素,剔除旧元素。

假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工作示意图如下所示:

Flink 原理与实现:Window 机制

图中所示的各个窗口逻辑上是不同的窗口,但在物理上是同一个窗口。该滑动计数窗口,trigger的触发条件是元素个数达到2个(每进入2个元素就会触发一次),evictor保留的元素个数是4个,每次计算完窗口总和后会保留剩余的元素。所以第一次触发trigger是当元素5进入,第三次触发trigger是当元素2进入,并驱逐5和2,计算剩余的4个元素的总和(22)并发送出去,保留下2,4,9,7元素供下个逻辑窗口使用。

同样的,我们也可以在 <code>keyedstream</code> 上申请 time window,其源码如下所示:

在方法体内部会根据当前环境注册的时间类型,使用不同的windowassigner创建window。可以看到,eventtime和ingesttime都使用了<code>xxxeventtimewindows</code>这个assigner,因为eventtime和ingesttime在底层的实现上只是在source处为record打时间戳的实现不同,在window operator中的处理逻辑是一样的。

这里我们主要分析sliding process time window,如下是相关源码:

首先,<code>slidingprocessingtimewindows</code>会对每个进入窗口的元素根据系统时间分配到<code>(size / slide)</code>个不同的窗口,并会在每个窗口上根据窗口结束时间注册一个定时器(相同窗口只会注册一份),当定时器超时时意味着该窗口完成了,这时会回调对应窗口的trigger的<code>onprocessingtime</code>方法,返回fire_and_purge,也就是会执行窗口计算并清空窗口。整个过程示意图如下:

Flink 原理与实现:Window 机制

如上图所示横轴代表时间戳(为简化问题,时间戳从0开始),第一条record会被分配到[-5,5)和[0,10)两个窗口中,当系统时间到5时,就会计算[-5,5)窗口中的数据,并将结果发送出去,最后清空窗口中的数据,释放该窗口资源。

<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#time-and-windows">flink concepts</a>

[introducing stream windows in apache flink

<a href="https://cwiki.apache.org/confluence/display/flink/streaming+window+join+rework">streaming window join rework</a>

<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageid=60624830">window semantics (and implementation)</a>

<a href="http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-6">introduction to flink streaming - part 6 : anatomy of window api</a>

<a href="http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-5">introduction to flink streaming - part 5 : window api in flink</a>