package window
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* @Author yqq
* @Date 2021/12/27 20:42
* @Version 1.0
*/
case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)
object AggregatFunctionTest {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//每隔5秒统计最近8秒内,每个基站的日志数量
//读取数据源
val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888)
.map(line => {
val arr: Array[String] = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
stream.map(log=>{(log.sid,1)})
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//开窗,滑动窗口
.aggregate(new MyAggregateFuntion,new MyWindowFunction)
.print()
environment.execute()
}
//MyWindowFunction 输入数据来自于 MyAggregateFuntion,在窗口结束的时候先执行MyAggregateFuntion对象的getResult,然后在执行apply方法
class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
out.collect((key,input.iterator.next()))//next得到的第一个值,迭代器中只有一个值
}
}
class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {
//初始化一个累加器,开始的时候为0
override def createAccumulator(): Long = 0
//来一条数据执行一次
override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2
//在窗口结束的时候执行一次
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
}