聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。
《2021年最新版大資料面試題全面開啟更新》
使用連接配接的廣播狀态
一個常見的需求就是流應用需要将同樣的事件分發到操作符的所有的并行執行個體中,而這樣的分發操作還得是可恢複的。
我們舉個例子:一條流是一個規則(比如5秒鐘内連續兩個超過門檻值的溫度),另一條流是待比對的流。也就是說,規則流和事件流。是以每一個操作符的并行執行個體都需要把規則流儲存在操作符狀态中。也就是說,規則流需要被廣播到所有的并行執行個體中去。
在Flink中,這樣的狀态叫做廣播狀态(broadcast state)。廣播狀态和DataStream或者KeyedStream都可以做連接配接操作。
下面的例子實作了一個溫度報警應用,應用有可以動态設定的門檻值,動态設定通過廣播流來實作。
val sensorData: DataStream[SensorReading] = ...
val thresholds: DataStream[ThresholdUpdate] = ...
val keyedSensorData: KeyedStream[SensorReading, String] = sensorData
.keyBy(_.id)
// the descriptor of the broadcast state
val broadcastStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds
.broadcast(broadcastStateDescriptor)
// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.connect(broadcastThresholds)
.process(new UpdatableTemperatureAlertFunction())
帶有廣播狀态的函數在應用到兩條流上時分三個步驟:
- 調用DataStream.broadcast()來建立BroadcastStream,定義一個或者多個MapStateDescriptor對象。
- 将BroadcastStream和DataStream/KeyedStream做connect操作。
- 在connected streams上調用KeyedBroadcastProcessFunction/BroadcastProcessFunction。
下面的例子實作了動态設定溫度門檻值的功能。
class UpdatableTemperatureAlertFunction()
extends KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate, (String, Double, Double)] {
// the descriptor of the broadcast state
private lazy val thresholdStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
// the keyed state handle
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// create keyed state descriptor
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
// obtain the keyed state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}
override def processBroadcastElement(
update: ThresholdUpdate,
ctx: KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate,
(String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]): Unit = {
// get broadcasted state handle
val thresholds = ctx
.getBroadcastState(thresholdStateDescriptor)
if (update.threshold != 0.0d) {
// configure a new threshold for the sensor
thresholds.put(update.id, update.threshold)
} else {
// remove threshold for the sensor
thresholds.remove(update.id)
}
}
override def processElement(
reading: SensorReading,
readOnlyCtx: KeyedBroadcastProcessFunction
[String, SensorReading, ThresholdUpdate,
(String, Double, Double)]#ReadOnlyContext,
out: Collector[(String, Double, Double)]): Unit = {
// get read-only broadcast state
val thresholds = readOnlyCtx
.getBroadcastState(thresholdStateDescriptor)
// check if we have a threshold
if (thresholds.contains(reading.id)) {
// get threshold for sensor
val sensorThreshold: Double = thresholds.get(reading.id)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > sensorThreshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, tempDiff))
}
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
}