天天看點

Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)

聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

Broadcast State使用場景

無論是分布式批處理還是流處理,将部分資料同步到所有執行個體上是一個十分常見的需求。例如,我們需要依賴一個不斷變化的控制規則來處理主資料流的資料,主資料流資料量比較大,隻能分散到多個算子執行個體上,控制規則資料相對比較小,可以分發到所有的算子執行個體上。Broadcast State與直接在時間視窗進行兩個資料流的Join的不同點在于,控制規則資料量較小,可以直接放到每個算子執行個體裡,這樣可以大大提高主資料流的處理速度。

Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)

我們繼續使用電商平台使用者行為分析為例,不同類型的使用者往往有特定的行為模式,有些使用者購買欲望強烈,有些使用者反複猶豫才下單,有些使用者頻繁爬取資料,有盜刷資料的嫌疑,電商平台營運人員為了提升商品的購買轉化率,保證平台的使用體驗,經常會進行一些使用者行為模式分析。基于這個場景,我們可以建構一個Flink作業,實時監控識别不同模式的使用者。為了避免每次更新規則模式後重新開機部署,我們可以将規則模式作為一個資料流與使用者行為資料流

connect

在一起,并将規則模式以Broadcast State的形式廣播到每個算子執行個體上。

電商使用者行為識别案例

下面開始具體建構一個執行個體程式。第一步,我們定義一些必要的資料結構來描述這個業務場景,包括使用者行為和規則模式兩個資料結構。

Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)
/**
    * 使用者行為
    * categoryId為商品類目ID
    * behavior包括點選(pv)、購買(buy)、加購物車(cart)、喜歡(fav)
    * */
case class UserBehavior(userId: Long,
                        itemId: Long,
                        categoryId: Int,
                        behavior: String,
                        timestamp: Long)

/**
    * 行為模式
    * 整個模式簡化為兩個行為
    * */
case class BehaviorPattern(firstBehavior: String, secondBehavior: String)      
Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)

然後我們在主邏輯中讀取兩個資料流:

// 主資料流
val userBehaviorStream: DataStream[UserBehavior] = ...
// BehaviorPattern資料流
val patternStream: DataStream[BehaviorPattern] = ...      

目前Broadcast State隻支援使用Key-Value形式,需要使用

MapStateDescriptor

來描述。這裡我們使用一個比較簡單的行為模式,是以Key是一個空類型。當然我們也可以根據業務場景,構造複雜的Key-Value對。然後,我們将模式流使用

broadcast

方法廣播到所有算子子任務上。

// Broadcast State隻能使用 Key->Value 結構,基于MapStateDescriptor
val broadcastStateDescriptor =
new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream
.broadcast(broadcastStateDescriptor)      

使用者行為模式流先按照使用者ID進行

keyBy

,然後與廣播流合并:

// 生成一個KeyedStream
val keyedStream =  userBehaviorStream.keyBy(user => user.userId)
// 在KeyedStream上進行connect和process
val matchedStream = keyedStream
  .connect(broadcastStream)
  .process(new BroadcastPatternFunction)      

BroadcastPatternFunction

KeyedBroadcastProcessFunction

的具體實作,它基于Broadcast State處理主資料流,生成

(Long, BehaviorPattern)

,分别表示使用者ID和命中的行為模式。下面的代碼展示了具體的使用方法。

Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)
/**
    * 四個泛型分别為:
    * 1. KeyedStream中Key的資料類型
    * 2. 主資料流的資料類型
    * 3. 廣播流的資料類型
    * 4. 輸出類型
    * */
class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] {

  // 使用者上次性能狀态句柄,每個使用者存儲一個狀态
  private var lastBehaviorState: ValueState[String] = _
  // Broadcast State Descriptor
  private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _

  override def open(parameters: Configuration): Unit = {

    lastBehaviorState = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("lastBehaviorState", classOf[String])
    )

    bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])

  }

  // 當BehaviorPattern流有新資料時,更新BroadcastState
  override def processBroadcastElement(pattern: BehaviorPattern,
                                       context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context,
                                       collector: Collector[(Long, BehaviorPattern)]): Unit = {

    val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc)
    // 将新資料更新至Broadcast State,這裡使用一個null作為Key
    // 在本場景中所有資料都共享一個Pattern,是以這裡僞造了一個Key
    bcPatternState.put(null, pattern)
  }

  override def processElement(userBehavior: UserBehavior,
                              context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext,
                              collector: Collector[(Long, BehaviorPattern)]): Unit = {

    // 擷取最新的Broadcast State
    val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)
    val lastBehavior: String = lastBehaviorState.value()
    if (pattern != null && lastBehavior != null) {
      // 使用者之前有過行為,檢查是否符合給定的模式
      if (pattern.firstBehavior.equals(lastBehavior) &&
          pattern.secondBehavior.equals(userBehavior.behavior))
      // 目前使用者行為符合模式
      collector.collect((userBehavior.userId, pattern))
    }
    lastBehaviorState.update(userBehavior.behavior)
  }
}      
Flink實戰案例:狀态管理(十一)自定義操作符狀态(五)廣播狀态(Broadcast state)(三)

總結下來,使用Broadcast State需要進行下面三步:

  1. 接收一個普通資料流,并使用

    broadcast

    方法将其轉換為

    BroadcastStream

    ,因為Broadcast State目前隻支援Key-Value結構,需要使用

    MapStateDescriptor

    描述它的資料結構。
  2. BroadcastStream

    與一個

    DataStream

    KeyedStream

    使用

    connect

    方法連接配接到一起。
  3. 實作一個

    ProcessFunction

    ,如果主流是

    DataStream

    ,則需要實作

    BroadcastProcessFunction

    ;如果主流是

    KeyedStream

    KeyedBroadcastProcessFunction

    。這兩種函數都提供了時間和狀态的通路方法。

KeyedBroadcastProcessFunction

個函數類中,有兩個函數需要實作:

  • processElement

    :處理主資料流(非Broadcast流)中的每條元素,輸出零到多個資料。

    ReadOnlyContext

     可以擷取時間和狀态,但是隻能以隻讀的形式讀取Broadcast State,不能修改,以保證每個算子執行個體上的Broadcast State都是相同的。
  • processBroadcastElement

    :處理流入的廣播流,可以輸出零到多個資料,一般用來更新Broadcast State。
  • onTimer()

    在先前注冊的計時器觸發時調用。定時器可以在任何處理方法中注冊,并用于執行計算或将來清理狀态。我們在示例中沒有實作此方法以保持代碼簡潔。但是,當使用者在一段時間内未處于活動狀态時,它可用于删除使用者的最後一個操作,以避免由于非活動使用者而導緻狀态增長。

小結

繼續閱讀