天天看点

Flink之window函数详解

1、官网:

 https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/programming-model.html#windows      (建议大家多看看官网) 

2、什么是Window

      Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了 非常完善的窗口机制,这是我认为的 Flink 最大的亮点之一。

     在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当 然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过 去的 1 分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来 收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

       聚合事件(例如 count、sum)在流上的工作方式与在批处理中不同。例如,不可能计算流中的 所有元素,因为流通常是无限的(无界的)。相反,流上的聚合(count、sum 等)由窗口限定范 围,例如“过去 5 分钟内的计数”或“最后 100 个元素的总和”。也就是说,流数据的计算 可以把连续不断的数据按照一定的规则拆分成大量的片段,在片段内进行统计和计算。比如 可以把一小时内的数据保存到一个小的数据库表里,然后对这部分数据进行计算和统计,这 时流计算是提供自动切割的一种机制-窗口。

Window 可以是时间驱动的(例如:每 30 秒),也可以是数据驱动的(例如:每 100 个元素):

  •     以时间为单位的 Time Window,例如:每 1 秒钟、每 1 个小时等
  •     以数据的数量为单位的 Count Window,例如:每 100 个元素

Flink 给我们提供了一些通用的时间窗口模型:

  1. 翻滚窗口(tumbling window,没有重叠)
  2. 滑动窗口(sliding window,有重叠)
  3. 会话窗口(session window,中间有一个不活动的间隙)

3、滚动窗口 Tumbling Window

      滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。 例如,如果指定大小为 5 分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新 窗口,如下图所示:

Flink之window函数详解

实例:

  1、需求:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进 行切分

Tumbling Time Window      使用 DataStream API

代码:

package com.qyl.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是员
 * 时间: 2019/05/10
 * 描述:
 * wordcount time window 操作
 */
object Window_Time {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 4s 统计一次词频
 */
 val resultDataStream: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .timeWindow(Time.seconds(4))
 .sum(1)
 resultDataStream.print()
 streamEnv.execute("WordCount every 10 second")
 }
}
           

 2、需求: 我们想要每 100 个用户购买行为事件统计购买总数,那么每当窗口中填满 100 个元素了, 就会对窗口进行计算

Tumbling Count Window         使用 DataStream API

代码:

package com.mazh.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是员
 * 时间: 2019/05/10 
 * 描述:
 * Tumbling Count Window
 */
object Window_Count {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
/**
 * 每隔 10 个单词统计一次词频
 */
 val resultDataStream: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .countWindow(10)
 .sum(1)
 resultDataStream.print()
 streamEnv.execute("WordCount every 10 word")
 }
}
           

4、滑动窗口    Sliding Window

       滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大 小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情 况下,元素被分配到多个窗口。例如,你可以使用窗口大小为 10 分钟的窗口,滑动大小为 5 分钟。这样,每 5 分钟会生成一个窗口,包含最后 10 分钟内到达的事件,如下图所示。

Flink之window函数详解

实例:

1、 需求:每 30 秒计算一次最近一分钟用户购买的商品总数

Sliding Time Window

package com.qyl.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是员
 * 时间: 2019/5/10 
 * 描述:
 * wordcount time window 操作
 */
object Window_Time {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 30s 统计过去 60s 的 wordcount
 */
 val resultDataStream1: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .timeWindow(Time.seconds(30), Time.seconds(60))
 .sum(1)
 resultDataStream1.print()
 streamEnv.execute("WordCount every 10 second")
 }
}
           

2、需求: 每隔 10 个单词统计过去 20 个单词

Sliding Count Window

代码:

package com.mazh.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是员
 * 时间: 2019/05/10 
 * 描述:
 * Tumbling Count Window
 */
object Window_Count {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 10 个单词统计过去 20 个单词的 wordcount
 */
 val resultDataStream1: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .countWindow(10, 20)
 .sum(1)
 resultDataStream1.print()
 streamEnv.execute("WordCount every 10 word")
 }
}
           

5、会话窗口  Session Window

         会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠, 也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭, 例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defineshow long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被 分配到新的会话窗口。

Flink之window函数详解

       在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃 的周期),由非活跃的间隙给分隔开。就是需要计算每个用户在活跃期间总共购买的商品数 量,如果用户 30 分钟没有活动则视为会话断开(假设 raw data stream 是单个用户的购买行 为流)。

5、对比

      一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间 的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己 定义窗口分配逻辑。

Flink之window函数详解

6、自定义Window

     得益于 Flink Window API 松耦合设计,我们可以非常灵活地定义符合特定业务的窗口。Flink 中定义一个窗口主要需要以下三个组件。

  1. Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
  2. Trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自 己的 Trigger。
  3. Evictor:可以译为“驱逐者”。在 Trigger 触发之后,在窗口被处理之前,Evictor(如果 有 Evictor 的话)会用来剔除窗口中不需要的元素,相当于一个 filter。

上述三个组件的不同实现的不同组合,可以定义出非常复杂的窗口。Flink 中内置的窗口也 都是基于这三个组件构成的,当然内置窗口有时候无法解决用户特殊的需求,所以 Flink 也 暴露了这些窗口机制的内部接口供用户实现自定义的窗口。

Flink之window函数详解

继续阅读