文章目錄
- 首先解釋一下windowAll跟keyBy.window的差別:
- 自定義SourceFunction用于測試
- 時間視窗
-
- 滾動時間視窗(兩種設定方式)
- 滑動時間視窗(兩種設定方式)
- 計數視窗
-
- 滾動計數視窗
- 滑動計數視窗
首先解釋一下windowAll跟keyBy.window的差別:
(1)windowAll就是把所有資料弄到一個slot處理,并行度始終為1
(2)keyBy會把資料分到不同的slot,keyBy.*window可以設定并行度
自定義SourceFunction用于測試
package com.fouth_sink
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import scala.util.Random
/**
* 自定義sourceFunction
*/
class CustomSourceFunction extends RichSourceFunction[(String, Int)]{
var flag = true
override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
val arr: Array[String] = Array("a", "b", "c", "d", "e", "f", "g")
val random: Random = new Random()
while (flag) {
Thread.sleep(1000)
// 随機取一個數組中的值
val key: String = arr(random.nextInt(arr.length))
val rightNow: Int = random.nextInt(10)
ctx.collect((key, rightNow))
}
}
override def cancel(): Unit = {
flag = false
}
}
時間視窗
滾動時間視窗(兩種設定方式)
根據固定時間或固定大小進行切分,視窗和視窗之間的元素互補重疊
特點:比較簡單,适用于按照固定大小和周期統計某一名額的這種類型的視窗計算
缺點:可能導緻某些有前後關系的資料計算結果不正确
timeWindow(Time.seconds(10))
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 滾動視窗計算
*/
object TumblingWindow1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 自定義的SourceFunction友善測試
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val kafkaDS: DataStream[(String, Int)] = env.addSource(customSourceFunction)
// 打出來,看一看
kafkaDS.print("streamPPP")
// 先KeyBy再開視窗
val resultDS: DataStream[(String, Int)] = kafkaDS
.keyBy(0)
.timeWindow(Time.seconds(10)) // Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}
window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
window(TumblingEventTimeTimeWindows.of(Time.seconds(10)))
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 滾動視窗計算
*/
object TumblingWindow2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 自定義的SourceFunction友善測試
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val kafkaDS: DataStream[(String, Int)] = env.addSource(customSourceFunction)
// 打出來,看一看
kafkaDS.print("streamPPP")
// 先KeyBy再開視窗
val resultDS: DataStream[(String, Int)] = kafkaDS
.keyBy(0)
// .timeWindow(Time.seconds(10)) // Time.milliseconds(x),Time.seconds(x),Time.minutes(x) 方式一
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 方式二
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}
滑動時間視窗(兩種設定方式)
timeWindow(Time.seconds(10), Time.seconds(5))
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 滑動時間視窗
*/
object SlidingWindow1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
ds.print("streamPP")
val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
// Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
.timeWindow(Time.seconds(10), Time.seconds(5)) // 滑動視窗大小,滑動時間
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}
window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object SlidingWindow2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
ds.print("streamPP")
val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
// Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
// .timeWindow(Time.seconds(10), Time.seconds(5)) // 滑動視窗大小,滑動時間 方式一
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 方式二
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}
計數視窗
滾動計數視窗
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
/**
* 滾動計數視窗
* 分組之後,隻要數量達到設定大小,開始計算
*/
object TumblingCountWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
ds.print("streamPP")
val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
.countWindow(5)
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}
滑動計數視窗
package com.windowprogram
import com.fouth_sink.CustomSourceFunction
import org.apache.flink.streaming.api.scala._
/**
* 滑動計數視窗
*/
object SlidingWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val customSourceFunction: CustomSourceFunction = new CustomSourceFunction
val ds: DataStream[(String, Int)] = env.addSource(customSourceFunction)
val resultDS: DataStream[(String, Int)] = ds.keyBy(0)
.countWindow(5, 3) // 視窗大小為5,每次滑動間隔為3
.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
resultDS.print("stream")
env.execute()
}
}