天天看點

(17) flink時間語義

文章目錄

      • 時間(Time)語義
      • 水位線(Watermark)
        • 特點
        • watermark 的傳遞
        • 引入
      • 代碼詳解
        • 直接設定延遲時間
      • 自定義周期性生成waterMark
      • 自定義間斷生成waterMark

(17) flink時間語義

Event Time:事件建立的時間

Ingestion Time:資料進入Flink的時間

Processing Time:執行操作算子的本地系統時間,與機器相關

一般設定時間戳為Event Time,預設是Processing Time

在代碼中設定 Event Time

(17) flink時間語義

具體的時間,還需要從資料中提取時間戳(timestamp),配置設定時間戳越資料源越好,比如樣例類處理資料後

assignAscendingTimestamps 設定一個升序的時間戳

當 Flink 以 Event Time 模式處理資料流時,它會根據資料裡的時間戳來處理基于時間的算子,由于網絡、分布式等原因,會導緻亂序資料的産生,造成計算不準确.

Watermark 是一種衡量 Event Time 進展的機制,可以設定延遲觸發,Watermark 是用于處理亂序事件的,而正确的處理亂序事件,通常用Watermark 機制結合 window 來實作;資料流中的 Watermark 用于表示 timestamp 小于 Watermark 的資料,都已經到達了,是以,window 的執行也是由 Watermark 觸發的。watermark 用來讓程式自己平衡延遲和結果正确性

(17) flink時間語義

watermark 是一條特殊的資料記錄,watermark 必須單調遞增,以確定任務的事件時間時鐘在向前推進,而不是在後退,watermark 與資料的時間戳相關

(17) flink時間語義

Event Time 的使用一定要指定資料源中的時間戳,對于排好序的資料,隻需要指定時間戳就夠了,不需要延遲觸發

(17) flink時間語義

Event Time 的使用一定要指定資料源中的時間戳,調用 assignTimestampAndWatermarks 方法,傳入一個 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark,對于排好序的資料,不需要延遲觸發,可以隻指定時間戳就行了

Flink 暴露了 TimestampAssigner 接口供我們實作,使我們可以自定義如何從事件資料中抽取時間戳和生成watermark

(17) flink時間語義

MyAssigner 可以有兩種類型,都繼承自 TimestampAssigner

定義了抽取時間戳,以及生成 watermark 的方法,有兩種類型

AssignerWithPeriodicWatermarks

周期性的生成 watermark:系統會周期性的将 watermark 插入到流中

預設周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法進行設定

升序和前面亂序的處理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。

package study

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/*
樣例資料
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718206, 35.1
sensor_1, 1547718299, 31.0
 */
object WindowTEst {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //設定時間為事件發生時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("note01", 7777)

   stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      .assignAscendingTimestamps(_.timeStamp * 1000) //延遲為目前時間 * 1000
      .map(data => (data.id, data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) // 開時間視窗
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))).print() // 用reduce做增量聚合
    env.execute();
  }
}

case class SensorReading(id: String, timeStamp: Long, temperature: Double)
           

package study

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time

/*
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718206, 35.1
sensor_1, 1547718299, 31.0
 */
object WindowTEst {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //設定時間為事件發生時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("note01", 7777)

    stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

      //      .assignAscendingTimestamps(_.timeStamp * 1000) //延遲為目前時間 * 1000
      .assignTimestampsAndWatermarks(new MyAssigner()) //自定義的周期性watermark
      .map(data => (data.id, data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) // 開時間視窗
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))).print() // 用reduce做增量聚合
    env.execute();
  }
}

case class SensorReading(id: String, timeStamp: Long, temperature: Double)

class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading] {
  //儲存目前最大的事件戳
  var maxTs = Long.MinValue
  //定義最大的亂序時間
  val bound = 1000L


  //周期間隔:預設200ms,将watermark注入到目前流中
  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
    maxTs = maxTs.max(element.timeStamp * 1000L)
    element.timeStamp * 1000
  }
}
           

class MyAssigner() extends AssignerWithPunctuatedWatermarks[SensorReading]{
  override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = element.timestamp * 1000
}