天天看點

廣播變量(Broadcast)-及生命周期

1 Spark中廣播變量概念

廣播變量是spark中共享變量的其中一種。它可以讓程式高效的向所有工作節點發送一個隻讀的值,以供一個或多個spark操作使用。

2 廣播變量使用示例

下面是一段簡單的計算邏輯,對比了不使用廣播變量和使用廣播變量的兩種方式:

val pws = Map("Apache Spark" -> "http://spark.apache.org/", "Scala" -> "http://www.scala-lang.org/")

//不使用廣播變量(比較低效,需要多次将pws變量發送到各個節點)

val time1=System.currentTimeMillis()

val websites = sc.parallelize(Seq("Apache Spark", "Scala")).map(pws).collect

val time2 =System.currentTimeMillis()

println("websites = " + (time2 - time1))

websites.foreach(println)

//使用廣播變量

val pwsB = sc.broadcast(pws)

val time3=System.currentTimeMillis()

val websitesWithBroadcast = sc.parallelize(Seq("Apache Spark", "Scala")).map(pwsB.value).collect

println("websitesWithBroadcast = " + (System.currentTimeMillis() - time3))

websitesWithBroadcast.foreach(println)

輸出:

===============廣播變量使用示例================

cost = 446ms

websites =

http://spark.apache.org/ http://www.scala-lang.org/

cost = 32ms

websitesWithBroadcast =

可以看出,輸出結果是一樣的,但是耗時,廣播變量明顯優于非廣播變量模式。

3 廣播變量原理

将變量廣播到各個執行器:

在spark中,使用SparkContext建立廣播變量,使用broadcastmanager和ContextCleaner管理其生命周期。

org.apache.spark.SparkContext#broadcast核心代碼:

def broadcast

T: ClassTag

: Broadcast[T] = {

//調用broadcastManager建立新的廣播變量

val bc = env.broadcastManager.newBroadcast

T

//注冊廣播變量到ContextCleaner

cleaner.foreach(_.registerBroadcastForCleanup(bc))

bc

}

也可以手動銷毀廣播變量org.apache.spark.broadcast.Broadcast#destroy:

def destroy() {

destroy(blocking = true)

擷取廣播變量的值:

/* Get the broadcasted value. /

def value: T = {

assertValid()

getValue()

轉載位址

http://xiajunhust.github.io/2018/12/23/Spark%E5%9F%BA%E7%A1%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B03-%E5%B9%BF%E6%92%AD%E5%8F%98%E9%87%8F/

繼續閱讀