1 使用遲到元素更新視窗計算結果(Updating Results by Including Late Events)
由于存在遲到的元素,是以已經計算出的視窗結果是不準确和不完全的。我們可以使用遲到元素更新已經計算完的視窗結果。
如果我們要求一個operator支援重新計算和更新已經發出的結果,就需要在第一次發出結果以後也要儲存之前所有的狀态。但顯然我們不能一直儲存所有的狀态,肯定會在某一個時間點将狀态清空,而一旦狀态被清空,結果就再也不能重新計算或者更新了。而遲到的元素隻能被抛棄或者發送到側輸出流。
window operator API提供了方法來明确聲明我們要等待遲到元素。當使用event-time window,我們可以指定一個時間段叫做allowed lateness。window operator如果設定了allowed lateness,這個window operator在水位線沒過視窗結束時間時也将不會删除視窗和視窗中的狀态。視窗會在一段時間内(allowed lateness設定的)保留所有的元素。
當遲到元素在allowed lateness時間内到達時,這個遲到元素會被實時處理并發送到觸發器(trigger)。當水位線沒過了視窗結束時間+allowed lateness時間時,視窗會被删除,并且所有後來的遲到的元素都會被丢棄。
package com.atguigu
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AllowedLateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("Linux1", 9999, '\n')
val s = stream
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong * 1000)
})
// .assignAscendingTimestamps(_._2)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
override def extractTimestamp(element: (String, Long)): Long = element._2
})
.keyBy(_._1)
// [0,5),...
.timeWindow(Time.seconds(5))
// 水位線超過 視窗結束時間 視窗閉合計算,但不銷毀
// 水位線超過 視窗結束時間 + allowed lateness,視窗更新結果并銷毀
.allowedLateness(Time.seconds(5))
.process(new MyAllowedLateProcess)
s.print()
env.execute()
}
class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long),
String, String,TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Long)],
out: Collector[String]): Unit = {
lazy val isUpdate = getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("update", Types.of[Boolean])
)
if (!isUpdate.value()) {
out.collect("在水位線超過視窗結束時間的時候,視窗第一次閉合計算")
isUpdate.update(true)
} else {
out.collect("遲到元素來了以後,更新視窗閉合計算的結果")
}
}
}
}
2 将遲到資料發送到另外一個流
如果想對這些遲到資料處理,我們可以使用Flink的側輸出(Side Output)功能,将遲到資料發到某個特定的流上。後續我們可以根據業務邏輯的要求,對遲到的資料流進行處理。
樣例:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
// 資料流有三個字段:(key, 時間戳, 數值)
val input: DataStream[(String, Long, Int)] = ...
val mainStream = input.keyBy(item => item._1)
.timeWindow(Time.seconds(5))
// 将輸出寫到late-elements裡
.sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements"))
.aggregate(new CountAggregate)
// 接受late-elements,形成一個資料流
val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements"))
上面的代碼将遲到的内容寫進名為“late-elements”的
OutputTag
下,之後使用
getSideOutput
擷取這些遲到的資料。