天天看點

Flink從入門到真香(12、Flink一大利器-時間視窗)

flink中支援多種視窗,包括:時間視窗,session視窗,統計視窗等等,能想到的基本都可以實作

時間視窗(Time Windows)

最簡單常用的視窗形式是基于時間的視窗,flink支援三種種時間視窗:

第一個: 翻滾時間視窗(tumbling time window)

翻滾時間視窗的視窗是固定的,比如設定一個1分鐘的時間視窗,該時間視窗将隻計算目前1分鐘内的資料,而不會管前1分鐘或後1分鐘的資料。

時間是對齊的,資料不會同時出現在2個視窗内,不會重疊

Flink從入門到真香(12、Flink一大利器-時間視窗)

第二個:滑動時間視窗(sliding time window)

滑動視窗,顧名思義,該時間視窗是滑動的。是以,從概念上講,這裡有兩個方面的概念需要了解:

視窗:需要定義視窗的大小

滑動:需要定義在視窗中滑動的大小,但理論上講滑動的大小不能超過視窗大小

滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成

視窗長度是固定的,可以有重疊的部分

Flink從入門到真香(12、Flink一大利器-時間視窗)

第三個: 會話視窗(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新資料就會生成新的視窗

主要特點就是: 時間無對齊

Flink從入門到真香(12、Flink一大利器-時間視窗)
window() 方法接收的輸入參數是一個WindowAssigner

WindowAssigner 負責将每條輸入的資料分發到正确的window中

Flink提供了通用的WindowAssigner
滾動視窗(tumbling window)
滑動視窗(sliding window)
會話視窗(session window)
全局視窗(global window)

建立不同類型的視窗

滾動時間視窗(tumbling time window)
timeWindow(Time.seconds(15))
滑動時間視窗(sliding time window)
.timeWindow(Time.seconds(15),Time.seconds(5))

會話視窗(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10))

視窗函數(window function)
window function 定義了要對視窗中收集的資料做的計算操作,可以分為兩類;
增量聚合函數(incrementalggergation functions)
每條資料來了就會進行計算,保持一個簡單的狀态
ReduceFunction, AggregateFunction
全視窗函數(full windowfunctions)
先把視窗所有資料收集起來,等到計算的時候會周遊所有資料
ProcessWindowFunction

其他一些常用的API
.trigger()---------觸發器
定義window什麼時候關閉,觸發計算并輸出結果
.evicotr()---------移除器
定義移除某些資料的邏輯
.allowedLateness()   ------允許處理遲到的資料
.sideOutputLateData() -----将遲到的資料放入側輸出流
.getSideOutput() ----擷取側輸出流
           

理論說半天其實還是萌的,上個栗子

假設從檔案讀一批資料,每15秒統計一次,擷取視窗内各傳感器所有溫度的最小值,以及最小的時間戳

package com.mafei.apitest

import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  //以事件時間作為視窗聚合
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)   //以資料進入flink的時間作為視窗時間
//    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //以Flink實際處理時間作為視窗時間
    //如果發現沒有輸出,那可能是因為資料太少,不到15s都處理完成了,可以換成socket或者kafka來進行測試
    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    env.setParallelism(1)
    inputStream.print()

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割資料,擷取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的資料,參數中傳toLong和toDouble是因為預設分割後是字元串類别
      })

    //每15秒統計一次,視窗内各傳感器所有溫度的最小值,以及最小的時間戳
    val resultStream = dataStream
      .map(data=>(data.id,data.temperature,data.timestamp))
      .keyBy(_._1) //按照二進制組的第一個元素(id)分組
//      .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滾動時間視窗
//      .window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3))) //滑動時間視窗,15秒一個視窗,每次往後劃3秒
//      .window(EventTimeSessionWindows.withGap(Time.seconds(15))) //會話視窗,超過15秒算下一個會話
//      .countWindow(15) //滾動計數視窗
      .timeWindow(Time.seconds(15))  //每15秒統計一次,滾動時間視窗
//      .minBy(1)  //第二個元素做最小值的統計,如果隻是擷取所有溫度的最小值,直接用這個方法就可以了。。
      .reduce((curRes,newData)=>(curRes._1, curRes._2.min(newData._2),newData._3))

    resultStream.print()
    env.execute()

  }
}

//上面reduce代碼如果用這個自定義的方式也是一樣可以實作,效果是一樣的
class MyReducer extends ReduceFunction[SensorReadingTest5]{
  override def reduce(t: SensorReadingTest5, t1: SensorReadingTest5): SensorReadingTest5 =
    SensorReadingTest5(t.id, t1.timestamp,t.temperature.min(t1.temperature))
}           
sensor1,1603766281,1
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766240,40.1
sensor4,1603766284,20
sensor4,1603766249,40.2           

繼續閱讀