天天看點

FLINK基礎(115): DS算子與視窗(24)視窗 (9) Allowed Lateness

1 使用遲到元素更新視窗計算結果(Updating Results by Including Late Events)

由于存在遲到的元素,是以已經計算出的視窗結果是不準确和不完全的。我們可以使用遲到元素更新已經計算完的視窗結果。

如果我們要求一個operator支援重新計算和更新已經發出的結果,就需要在第一次發出結果以後也要儲存之前所有的狀态。但顯然我們不能一直儲存所有的狀态,肯定會在某一個時間點将狀态清空,而一旦狀态被清空,結果就再也不能重新計算或者更新了。而遲到的元素隻能被抛棄或者發送到側輸出流。

window operator API提供了方法來明确聲明我們要等待遲到元素。當使用event-time window,我們可以指定一個時間段叫做allowed lateness。window operator如果設定了allowed lateness,這個window operator在水位線沒過視窗結束時間時也将不會删除視窗和視窗中的狀态。視窗會在一段時間内(allowed lateness設定的)保留所有的元素。

當遲到元素在allowed lateness時間内到達時,這個遲到元素會被實時處理并發送到觸發器(trigger)。當水位線沒過了視窗結束時間+allowed lateness時間時,視窗會被删除,并且所有後來的遲到的元素都會被丢棄。

package com.atguigu

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AllowedLateTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.socketTextStream("Linux1", 9999, '\n')
    val s = stream
      .map(line => {
        val arr = line.split(" ")
        (arr(0), arr(1).toLong * 1000)
      })
//      .assignAscendingTimestamps(_._2)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        override def extractTimestamp(element: (String, Long)): Long = element._2
      })
      .keyBy(_._1)
      // [0,5),...
      .timeWindow(Time.seconds(5))
      // 水位線超過 視窗結束時間 視窗閉合計算,但不銷毀
      // 水位線超過 視窗結束時間 + allowed lateness,視窗更新結果并銷毀
      .allowedLateness(Time.seconds(5))
      .process(new MyAllowedLateProcess)
    s.print()
    env.execute()
  }
  class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long),
    String, String,TimeWindow] {
    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[String]): Unit = {
      lazy val isUpdate = getRuntimeContext.getState(
        new ValueStateDescriptor[Boolean]("update", Types.of[Boolean])
      )
      if (!isUpdate.value()) {
        out.collect("在水位線超過視窗結束時間的時候,視窗第一次閉合計算")
        isUpdate.update(true)
      } else {
        out.collect("遲到元素來了以後,更新視窗閉合計算的結果")
      }
    }
  }
}      

2 将遲到資料發送到另外一個流

如果想對這些遲到資料處理,我們可以使用Flink的側輸出(Side Output)功能,将遲到資料發到某個特定的流上。後續我們可以根據業務邏輯的要求,對遲到的資料流進行處理。

樣例:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);      
// 資料流有三個字段:(key, 時間戳, 數值)
val input: DataStream[(String, Long, Int)] = ...

val mainStream = input.keyBy(item => item._1)
        .timeWindow(Time.seconds(5))
        // 将輸出寫到late-elements裡
        .sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements"))
        .aggregate(new CountAggregate)

// 接受late-elements,形成一個資料流
val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements"))      

上面的代碼将遲到的内容寫進名為“late-elements”的​

​OutputTag​

​下,之後使用​

​getSideOutput​

​擷取這些遲到的資料。