flink中支援多種視窗,包括:時間視窗,session視窗,統計視窗等等,能想到的基本都可以實作
時間視窗(Time Windows)
最簡單常用的視窗形式是基于時間的視窗,flink支援三種種時間視窗:
第一個: 翻滾時間視窗(tumbling time window)
翻滾時間視窗的視窗是固定的,比如設定一個1分鐘的時間視窗,該時間視窗将隻計算目前1分鐘内的資料,而不會管前1分鐘或後1分鐘的資料。
時間是對齊的,資料不會同時出現在2個視窗内,不會重疊

第二個:滑動時間視窗(sliding time window)
滑動視窗,顧名思義,該時間視窗是滑動的。是以,從概念上講,這裡有兩個方面的概念需要了解:
視窗:需要定義視窗的大小
滑動:需要定義在視窗中滑動的大小,但理論上講滑動的大小不能超過視窗大小
滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成
視窗長度是固定的,可以有重疊的部分
第三個: 會話視窗(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新資料就會生成新的視窗
主要特點就是: 時間無對齊
window() 方法接收的輸入參數是一個WindowAssigner
WindowAssigner 負責将每條輸入的資料分發到正确的window中
Flink提供了通用的WindowAssigner
滾動視窗(tumbling window)
滑動視窗(sliding window)
會話視窗(session window)
全局視窗(global window)
建立不同類型的視窗
滾動時間視窗(tumbling time window)
timeWindow(Time.seconds(15))
滑動時間視窗(sliding time window)
.timeWindow(Time.seconds(15),Time.seconds(5))
會話視窗(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
視窗函數(window function)
window function 定義了要對視窗中收集的資料做的計算操作,可以分為兩類;
增量聚合函數(incrementalggergation functions)
每條資料來了就會進行計算,保持一個簡單的狀态
ReduceFunction, AggregateFunction
全視窗函數(full windowfunctions)
先把視窗所有資料收集起來,等到計算的時候會周遊所有資料
ProcessWindowFunction
其他一些常用的API
.trigger()---------觸發器
定義window什麼時候關閉,觸發計算并輸出結果
.evicotr()---------移除器
定義移除某些資料的邏輯
.allowedLateness() ------允許處理遲到的資料
.sideOutputLateData() -----将遲到的資料放入側輸出流
.getSideOutput() ----擷取側輸出流
理論說半天其實還是萌的,上個栗子
假設從檔案讀一批資料,每15秒統計一次,擷取視窗内各傳感器所有溫度的最小值,以及最小的時間戳
package com.mafei.apitest
import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
object WindowTest {
def main(args: Array[String]): Unit = {
//建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //以事件時間作為視窗聚合
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) //以資料進入flink的時間作為視窗時間
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //以Flink實際處理時間作為視窗時間
//如果發現沒有輸出,那可能是因為資料太少,不到15s都處理完成了,可以換成socket或者kafka來進行測試
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
//先轉換成樣例類類型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割資料,擷取結果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的資料,參數中傳toLong和toDouble是因為預設分割後是字元串類别
})
//每15秒統計一次,視窗内各傳感器所有溫度的最小值,以及最小的時間戳
val resultStream = dataStream
.map(data=>(data.id,data.temperature,data.timestamp))
.keyBy(_._1) //按照二進制組的第一個元素(id)分組
// .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滾動時間視窗
// .window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3))) //滑動時間視窗,15秒一個視窗,每次往後劃3秒
// .window(EventTimeSessionWindows.withGap(Time.seconds(15))) //會話視窗,超過15秒算下一個會話
// .countWindow(15) //滾動計數視窗
.timeWindow(Time.seconds(15)) //每15秒統計一次,滾動時間視窗
// .minBy(1) //第二個元素做最小值的統計,如果隻是擷取所有溫度的最小值,直接用這個方法就可以了。。
.reduce((curRes,newData)=>(curRes._1, curRes._2.min(newData._2),newData._3))
resultStream.print()
env.execute()
}
}
//上面reduce代碼如果用這個自定義的方式也是一樣可以實作,效果是一樣的
class MyReducer extends ReduceFunction[SensorReadingTest5]{
override def reduce(t: SensorReadingTest5, t1: SensorReadingTest5): SensorReadingTest5 =
SensorReadingTest5(t.id, t1.timestamp,t.temperature.min(t1.temperature))
}
sensor1,1603766281,1
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766240,40.1
sensor4,1603766284,20
sensor4,1603766249,40.2