天天看點

Flink中的Broadcast廣播變量

在Flink中,同一個算子可能存在若幹個不同的并行執行個體,計算過程可能不在同一個Slot中進行,不同算子之間更是如此,是以不同算子的計算資料之間不能像Java數組之間一樣互相通路,而廣播變量

Broadcast

便是解決這種情況的。如下代碼所示:

val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1", "2", "3", "4", "5")
    val ds2 = env.fromElements("a", "b", "c", "d", "e")

    ds1.map{
      t =>
        (t, ds2)
    }.print()
           

運作上述代碼會報

InvalidProgramException

的錯,因為在

ds1

map

算子中無法再去調用

ds2

,此時可以使用廣播變量将

ds2

這個變量進行廣播,使得

ds2

這一被廣播的資料集在

ds1

map

算子的所有并行執行個體中都可用,具體處理方式如下:

object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1", "2", "3", "4", "5")
    val ds2 = env.fromElements("a", "b", "c", "d", "e")

    ds1.map(new RichMapFunction[String, (String, String)] {
      private var ds2: Traversable[String] = null

      override def open(parameters: Configuration) {
        ds2 = getRuntimeContext.getBroadcastVariable[String]("broadCast").asScala
      }

      def map(t: String): (String, String) = {
        var result = ""
        for (broadVariable <- ds2) {
          result = result + broadVariable + " "
        }
        (t, result)
      }
    }).withBroadcastSet(ds2, "broadCast").print()
  }
}
           

上述代碼能夠成功運作出如下結果:

(1,a b c d e )

(2,a b c d e )

(3,a b c d e )

(4,a b c d e )

(5,a b c d e )

可以看到,

ds1

map

算子通過通路廣播變量成功通路到

ds2

中的資料。該過程分為兩步:設定廣播變量和擷取廣播變量。

  1. 設定廣播變量

    在某個需要用到該廣播變量的算子後調用

    withBroadcastSet(var1, var2)

    進行設定,

    var1

    為需要廣播變量的變量名,

    var2

    是自定義變量名,為

    String

    類型。注意,被廣播的變量隻能為

    DataSet

    類型,不能為

    List

    Int

    String

    等類型。
  2. 擷取廣播變量

    建立該算子對應的富函數類,例如

    map

    函數的富函數類是

    RichMapFunction

    ,該類有兩個構造參數,第一個參數為算子輸入資料類型,第二個參數為算子輸出資料類型。首先建立一個

    Traversable[_]

    接口用于接收廣播變量并初始化為空,接收類型與算子輸入資料類型相對應;然後重寫

    open

    函數,通過

    getRuntimeContext.getBroadcastVariable[_](var)

    擷取到廣播變量,var即為設定廣播變量時的自定義變量名,類型為

    String

    open

    函數在算子生命周期的初始化階段便會調用;最後在

    map

    方法中對擷取到的廣播變量進行通路及其它操作。

注意:隻有在某個

Operator

中使用到不屬于該

Operator

DataSet

時才需要廣播變量,在

iterate

内部可以将某個

DataSet

直接作為起始節點,不需要使用廣播變量。

繼續閱讀