天天看點

Flink中Window詳解之Window的聚合函數AggregateFunction

package window

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * @Author yqq
 * @Date 2021/12/27 20:42
 * @Version 1.0
 */
case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)
object AggregatFunctionTest {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    //每隔5秒統計最近8秒内,每個基站的日志數量
    //讀取資料源
    val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888)
      .map(line => {
        val arr: Array[String] = line.split(",")
        new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })
    //開窗
    stream.map(log=>{(log.sid,1)})
      .keyBy(_._1)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//開窗,滑動視窗
      .aggregate(new MyAggregateFuntion,new MyWindowFunction)
      .print()

    environment.execute()
  }
  //MyWindowFunction 輸入資料來自于 MyAggregateFuntion,在視窗結束的時候先執行MyAggregateFuntion對象的getResult,然後在執行apply方法
  class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
      out.collect((key,input.iterator.next()))//next得到的第一個值,疊代器中隻有一個值
    }
  }
  class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {
    //初始化一個累加器,開始的時候為0
    override def createAccumulator(): Long = 0
    //來一條資料執行一次
    override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2
    //在視窗結束的時候執行一次
    override def getResult(accumulator: Long): Long = accumulator

    override def merge(a: Long, b: Long): Long = a+b
  }
}