天天看點

Flink核心概念之window首先解釋一下windowAll跟keyBy.window的差別:自定義SourceFunction用于測試時間視窗計數視窗

文章目錄

  • 首先解釋一下windowAll跟keyBy.window的差別:
  • 自定義SourceFunction用于測試
  • 時間視窗
    • 滾動時間視窗(兩種設定方式)
    • 滑動時間視窗(兩種設定方式)
  • 計數視窗
    • 滾動計數視窗
    • 滑動計數視窗

首先解釋一下windowAll跟keyBy.window的差別:

(1)windowAll就是把所有資料弄到一個slot處理,并行度始終為1

(2)keyBy會把資料分到不同的slot,keyBy.*window可以設定并行度

自定義SourceFunction用于測試

package com.fouth_sink

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

import scala.util.Random

/**
  * 自定義sourceFunction
  */
class CustomSourceFunction extends RichSourceFunction[(String, Int)]{

  var flag = true

  override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
    val arr: Array[String] = Array("a", "b", "c", "d", "e", "f", "g")
    val random: Random = new Random()
    while (flag) {
      Thread.sleep(1000)
      // 随機取一個數組中的值
      val key: String = arr(random.nextInt(arr.length))
      val rightNow: Int = random.nextInt(10)
      ctx.collect((key, rightNow))
    }
  }

  override def cancel(): Unit = {
    flag = false
  }
}
           

時間視窗

滾動時間視窗(兩種設定方式)

根據固定時間或固定大小進行切分,視窗和視窗之間的元素互補重疊

特點:比較簡單,适用于按照固定大小和周期統計某一名額的這種類型的視窗計算

缺點:可能導緻某些有前後關系的資料計算結果不正确

timeWindow(Time.seconds(10))

package com.windowprogram

import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 滾動視窗計算
  */
object TumblingWindow1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 自定義的SourceFunction友善測試
    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
    val kafkaDS: DataStream[(String, Int)] = env.addSource(customSourceFunction)
    // 打出來,看一看
    kafkaDS.print("streamPPP")
    // 先KeyBy再開視窗
    val resultDS: DataStream[(String, Int)] = kafkaDS
      .keyBy(0)
      .timeWindow(Time.seconds(10)) // Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))
    resultDS.print("stream")
    env.execute()
  }
}
           

window(TumblingProcessingTimeWindows.of(Time.seconds(10)))

window(TumblingEventTimeTimeWindows.of(Time.seconds(10)))

package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
  * 滾動視窗計算
  */
object TumblingWindow2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 自定義的SourceFunction友善測試
    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
    val kafkaDS: DataStream[(String, Int)] = env.addSource(customSourceFunction)
    // 打出來,看一看
    kafkaDS.print("streamPPP")
    // 先KeyBy再開視窗
    val resultDS: DataStream[(String, Int)] = kafkaDS
      .keyBy(0)
//      .timeWindow(Time.seconds(10)) // Time.milliseconds(x),Time.seconds(x),Time.minutes(x) 方式一
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 方式二
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))
    resultDS.print("stream")
    env.execute()
  }
}
           

滑動時間視窗(兩種設定方式)

timeWindow(Time.seconds(10), Time.seconds(5))

package com.windowprogram

import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 滑動時間視窗
  */
object SlidingWindow1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction

    val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
    ds.print("streamPP")
    val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
      //  Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
      .timeWindow(Time.seconds(10), Time.seconds(5)) // 滑動視窗大小,滑動時間
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))
    resultDS.print("stream")

    env.execute()
  }
}
           

window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

package com.windowprogram

import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object SlidingWindow2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction

    val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
    ds.print("streamPP")
    val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
      //  Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
//      .timeWindow(Time.seconds(10), Time.seconds(5)) // 滑動視窗大小,滑動時間 方式一
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 方式二
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))
    resultDS.print("stream")

    env.execute()
  }
}

           

計數視窗

滾動計數視窗

package com.windowprogram

import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._

/**
  * 滾動計數視窗
  * 分組之後,隻要數量達到設定大小,開始計算
  */
object TumblingCountWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction

    val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)

    ds.print("streamPP")

    val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
      .countWindow(5)
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))

    resultDS.print("stream")

    env.execute()
  }
}

           

滑動計數視窗

package com.windowprogram

import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._

/**
  * 滑動計數視窗
  */
object SlidingWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val customSourceFunction: CustomSourceFunction = new CustomSourceFunction

    val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)

    val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
      .countWindow(5, 3) // 視窗大小為5,每次滑動間隔為3
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2))

    resultDS.print("stream")

    env.execute()
  }
}

           

繼續閱讀