在Flink中,同一個算子可能存在若幹個不同的并行執行個體,計算過程可能不在同一個Slot中進行,不同算子之間更是如此,是以不同算子的計算資料之間不能像Java數組之間一樣互相通路,而廣播變量
Broadcast
便是解決這種情況的。如下代碼所示:
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1 = env.fromElements("1", "2", "3", "4", "5")
val ds2 = env.fromElements("a", "b", "c", "d", "e")
ds1.map{
t =>
(t, ds2)
}.print()
運作上述代碼會報
InvalidProgramException
的錯,因為在
ds1
的
map
算子中無法再去調用
ds2
,此時可以使用廣播變量将
ds2
這個變量進行廣播,使得
ds2
這一被廣播的資料集在
ds1
中
map
算子的所有并行執行個體中都可用,具體處理方式如下:
object BroadcastTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1 = env.fromElements("1", "2", "3", "4", "5")
val ds2 = env.fromElements("a", "b", "c", "d", "e")
ds1.map(new RichMapFunction[String, (String, String)] {
private var ds2: Traversable[String] = null
override def open(parameters: Configuration) {
ds2 = getRuntimeContext.getBroadcastVariable[String]("broadCast").asScala
}
def map(t: String): (String, String) = {
var result = ""
for (broadVariable <- ds2) {
result = result + broadVariable + " "
}
(t, result)
}
}).withBroadcastSet(ds2, "broadCast").print()
}
}
上述代碼能夠成功運作出如下結果:
(1,a b c d e )
(2,a b c d e )
(3,a b c d e )
(4,a b c d e )
(5,a b c d e )
可以看到,
ds1
的
map
算子通過通路廣播變量成功通路到
ds2
中的資料。該過程分為兩步:設定廣播變量和擷取廣播變量。
-
設定廣播變量
在某個需要用到該廣播變量的算子後調用
進行設定,withBroadcastSet(var1, var2)
為需要廣播變量的變量名,var1
是自定義變量名,為var2
類型。注意,被廣播的變量隻能為String
類型,不能為DataSet
、List
、Int
等類型。String
-
擷取廣播變量
建立該算子對應的富函數類,例如
函數的富函數類是map
,該類有兩個構造參數,第一個參數為算子輸入資料類型,第二個參數為算子輸出資料類型。首先建立一個RichMapFunction
接口用于接收廣播變量并初始化為空,接收類型與算子輸入資料類型相對應;然後重寫Traversable[_]
函數,通過open
擷取到廣播變量,var即為設定廣播變量時的自定義變量名,類型為getRuntimeContext.getBroadcastVariable[_](var)
,String
函數在算子生命周期的初始化階段便會調用;最後在open
方法中對擷取到的廣播變量進行通路及其它操作。map
注意:隻有在某個
Operator
中使用到不屬于該
Operator
的
DataSet
時才需要廣播變量,在
iterate
内部可以将某個
DataSet
直接作為起始節點,不需要使用廣播變量。