聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。
《2021年最新版大資料面試題全面開啟更新》
Broadcast State使用場景
無論是分布式批處理還是流處理,将部分資料同步到所有執行個體上是一個十分常見的需求。例如,我們需要依賴一個不斷變化的控制規則來處理主資料流的資料,主資料流資料量比較大,隻能分散到多個算子執行個體上,控制規則資料相對比較小,可以分發到所有的算子執行個體上。Broadcast State與直接在時間視窗進行兩個資料流的Join的不同點在于,控制規則資料量較小,可以直接放到每個算子執行個體裡,這樣可以大大提高主資料流的處理速度。
我們繼續使用電商平台使用者行為分析為例,不同類型的使用者往往有特定的行為模式,有些使用者購買欲望強烈,有些使用者反複猶豫才下單,有些使用者頻繁爬取資料,有盜刷資料的嫌疑,電商平台營運人員為了提升商品的購買轉化率,保證平台的使用體驗,經常會進行一些使用者行為模式分析。基于這個場景,我們可以建構一個Flink作業,實時監控識别不同模式的使用者。為了避免每次更新規則模式後重新開機部署,我們可以将規則模式作為一個資料流與使用者行為資料流
connect
在一起,并将規則模式以Broadcast State的形式廣播到每個算子執行個體上。
電商使用者行為識别案例
下面開始具體建構一個執行個體程式。第一步,我們定義一些必要的資料結構來描述這個業務場景,包括使用者行為和規則模式兩個資料結構。
/**
* 使用者行為
* categoryId為商品類目ID
* behavior包括點選(pv)、購買(buy)、加購物車(cart)、喜歡(fav)
* */
case class UserBehavior(userId: Long,
itemId: Long,
categoryId: Int,
behavior: String,
timestamp: Long)
/**
* 行為模式
* 整個模式簡化為兩個行為
* */
case class BehaviorPattern(firstBehavior: String, secondBehavior: String)
然後我們在主邏輯中讀取兩個資料流:
// 主資料流
val userBehaviorStream: DataStream[UserBehavior] = ...
// BehaviorPattern資料流
val patternStream: DataStream[BehaviorPattern] = ...
目前Broadcast State隻支援使用Key-Value形式,需要使用
MapStateDescriptor
來描述。這裡我們使用一個比較簡單的行為模式,是以Key是一個空類型。當然我們也可以根據業務場景,構造複雜的Key-Value對。然後,我們将模式流使用
broadcast
方法廣播到所有算子子任務上。
// Broadcast State隻能使用 Key->Value 結構,基于MapStateDescriptor
val broadcastStateDescriptor =
new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream
.broadcast(broadcastStateDescriptor)
使用者行為模式流先按照使用者ID進行
keyBy
,然後與廣播流合并:
// 生成一個KeyedStream
val keyedStream = userBehaviorStream.keyBy(user => user.userId)
// 在KeyedStream上進行connect和process
val matchedStream = keyedStream
.connect(broadcastStream)
.process(new BroadcastPatternFunction)
BroadcastPatternFunction
是
KeyedBroadcastProcessFunction
的具體實作,它基于Broadcast State處理主資料流,生成
(Long, BehaviorPattern)
,分别表示使用者ID和命中的行為模式。下面的代碼展示了具體的使用方法。
/**
* 四個泛型分别為:
* 1. KeyedStream中Key的資料類型
* 2. 主資料流的資料類型
* 3. 廣播流的資料類型
* 4. 輸出類型
* */
class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] {
// 使用者上次性能狀态句柄,每個使用者存儲一個狀态
private var lastBehaviorState: ValueState[String] = _
// Broadcast State Descriptor
private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _
override def open(parameters: Configuration): Unit = {
lastBehaviorState = getRuntimeContext.getState(
new ValueStateDescriptor[String]("lastBehaviorState", classOf[String])
)
bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
}
// 當BehaviorPattern流有新資料時,更新BroadcastState
override def processBroadcastElement(pattern: BehaviorPattern,
context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context,
collector: Collector[(Long, BehaviorPattern)]): Unit = {
val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc)
// 将新資料更新至Broadcast State,這裡使用一個null作為Key
// 在本場景中所有資料都共享一個Pattern,是以這裡僞造了一個Key
bcPatternState.put(null, pattern)
}
override def processElement(userBehavior: UserBehavior,
context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext,
collector: Collector[(Long, BehaviorPattern)]): Unit = {
// 擷取最新的Broadcast State
val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)
val lastBehavior: String = lastBehaviorState.value()
if (pattern != null && lastBehavior != null) {
// 使用者之前有過行為,檢查是否符合給定的模式
if (pattern.firstBehavior.equals(lastBehavior) &&
pattern.secondBehavior.equals(userBehavior.behavior))
// 目前使用者行為符合模式
collector.collect((userBehavior.userId, pattern))
}
lastBehaviorState.update(userBehavior.behavior)
}
}
總結下來,使用Broadcast State需要進行下面三步:
- 接收一個普通資料流,并使用
方法将其轉換為broadcast
,因為Broadcast State目前隻支援Key-Value結構,需要使用BroadcastStream
描述它的資料結構。MapStateDescriptor
- 将
與一個BroadcastStream
或DataStream
使用KeyedStream
方法連接配接到一起。connect
- 實作一個
,如果主流是ProcessFunction
,則需要實作DataStream
;如果主流是BroadcastProcessFunction
KeyedStream
。這兩種函數都提供了時間和狀态的通路方法。KeyedBroadcastProcessFunction
在
KeyedBroadcastProcessFunction
個函數類中,有兩個函數需要實作:
-
:處理主資料流(非Broadcast流)中的每條元素,輸出零到多個資料。processElement
可以擷取時間和狀态,但是隻能以隻讀的形式讀取Broadcast State,不能修改,以保證每個算子執行個體上的Broadcast State都是相同的。ReadOnlyContext
-
:處理流入的廣播流,可以輸出零到多個資料,一般用來更新Broadcast State。processBroadcastElement
-
在先前注冊的計時器觸發時調用。定時器可以在任何處理方法中注冊,并用于執行計算或将來清理狀态。我們在示例中沒有實作此方法以保持代碼簡潔。但是,當使用者在一段時間内未處于活動狀态時,它可用于删除使用者的最後一個操作,以避免由于非活動使用者而導緻狀态增長。onTimer()