天天看点

Flink中Window详解之Window的聚合函数AggregateFunction

package window

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * @Author yqq
 * @Date 2021/12/27 20:42
 * @Version 1.0
 */
case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)
object AggregatFunctionTest {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    //每隔5秒统计最近8秒内,每个基站的日志数量
    //读取数据源
    val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888)
      .map(line => {
        val arr: Array[String] = line.split(",")
        new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })
    //开窗
    stream.map(log=>{(log.sid,1)})
      .keyBy(_._1)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//开窗,滑动窗口
      .aggregate(new MyAggregateFuntion,new MyWindowFunction)
      .print()

    environment.execute()
  }
  //MyWindowFunction 输入数据来自于 MyAggregateFuntion,在窗口结束的时候先执行MyAggregateFuntion对象的getResult,然后在执行apply方法
  class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
      out.collect((key,input.iterator.next()))//next得到的第一个值,迭代器中只有一个值
    }
  }
  class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {
    //初始化一个累加器,开始的时候为0
    override def createAccumulator(): Long = 0
    //来一条数据执行一次
    override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2
    //在窗口结束的时候执行一次
    override def getResult(accumulator: Long): Long = accumulator

    override def merge(a: Long, b: Long): Long = a+b
  }
}