天天看點

Flink Time chaplinthink

基礎概念

支援三種時間概念:

  • Processing Time 時間遞增
  • Ingestion Time : 攝入時間,資料進入Flink架構的時間,在Source Operator中設定,每個事件拿到目前時間作為時間戳,後續的時間視窗基于該時間
  • Event Time 支援一定程度的亂序

    上一個 checkpoint 或者 savepoint 進行重放,是不是希望結果完全相同。如果希望結果完全相同,就隻能用 Event Time;如果接受結果不同,則可以用 Processing Time。

watermark

一個watermark 代表了 watermark所包含的timestamp 數值,表示後來的資料已經再也沒有小于或等于這個時間的了.

Flink 支援兩種 watermark 生成方式:

  • 在SourceFunction中産生

collectWithTimestamp 方法發送一條資料

第一個參數就是我們要發送的資料

第二個參數就是這個資料所對應的時間戳

emitWatermark 去産生一條 watermark: 表示接下來不會再有時間戳小于等于這個數值記錄

  • 在使用DataStream API 的時候指定
DataStream.assignTimestampsAndWatermarks
           

建議生成的工作越靠近 DataSource 越好。這樣會友善讓程式邏輯裡面更多的 operator 去判斷某些資料是否亂序。

code demo:

object WaterMakerTest {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val dataStream: DataStream[Order] = env.socketTextStream("localhost", 9999).map(item => {
            val itemArray = item.split(",")
            Order(itemArray(0).toLong, itemArray(1), itemArray(2).toDouble)
        })

        val outputStream: DataStream[Order] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(0)) {
            override def extractTimestamp(element: Order): Long = element.timestamp * 1000L
        }).keyBy("category").timeWindow(Time.seconds(5)).apply(new MyWindowFunction)

        dataStream.print("Data")
        outputStream.print("Result")

        env.execute()
    }
}

class MyWindowFunction extends WindowFunction[Order, Order, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Order], out: Collector[Order]): Unit = {
        val timestamp = window.maxTimestamp()
        var sum: Double = 0
        for (elem <- input) {
            sum += elem.price
        }

        val category = key.asInstanceOf[Tuple1[String]].f0
        out.collect(Order(timestamp, category, sum))
    }
}


case class Order(timestamp: Long, category: String, price: Double)
           

總結

主要了解Flink的時間概念以及Watermark的作用,它可以處理亂序資料,通過watermark來定義關窗的時間點. 可以在SourceFunction和DataStream API 指定生成 Watermark.

作者:chaplinthink

出處:https://www.cnblogs.com/bigdata1024/p/16271916.html

本文以學習、研究和分享為主,如需轉載,請聯系本人,标明作者和出處,非商業用途!

繼續閱讀