天天看點

spark廣播變量的原理_Spark2.3(四十三):Spark Broadcast總結,【Spark篇】---Spark中廣播變量和累加器...

為什麼要使用廣播(broadcast)變量?

Spark中因為算子中的真正邏輯是發送到Executor中去運作的,是以當Executor中需要引用外部變量時,需要使用廣播變量。

進一步解釋:

如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中隻有一份Driver端的變量副本。

Spark中Broadcast定義

官網定義:

A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(T, scala.reflect.ClassTag). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The interpreter session below shows this:

[翻譯]它是一個廣播變量。廣播變量允許程式員在每台計算機上緩存隻讀變量,而不是将其副本與任務一起發送。例如,它們可以有效地為每個節點提供一個大型輸入資料集的副本。Spark還嘗試使用有效的廣播算法來配置設定廣播變量,以降低通信成本。

廣播變量是通過調用sparkcontext.broadcast(t,scala.reflect.classtag)從變量v建立的。廣播變量是圍繞v的包裝器,它的值可以通過調用value方法來通路。下面的口譯員會話顯示了這一點:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int}= Broadcast(0)

scala>broadcastVar.value

res0: Array[Int]= Array(1, 2, 3)

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

[翻譯]建立廣播變量後,應在叢集上運作的任何函數中使用它,而不是使用值v,這樣V就不會多次發送到節點。此外,對象v在廣播後不應進行修改,以確定所有節點都獲得廣播變量的相同值(例如,如果變量稍後被發送到新節點)。

Broadcast.scala類定義:

packageorg.apache.spark.broadcastimportjava.io.Serializableimportscala.reflect.ClassTagimportorg.apache.spark.SparkExceptionimportorg.apache.spark.internal.Loggingimportorg.apache.spark.util.Utils

abstract class Broadcast[T: ClassTag](val id: Long) extendsSerializable with Logging {@volatile private var _isValid = true

private var _destroySite = ""

def value: T={

assertValid()

getValue()

}def unpersist() {

unpersist(blocking= false)

}def unpersist(blocking: Boolean) {

assertValid()

doUnpersist(blocking)

}def destroy() {

destroy(blocking= true)

}

private[spark] def destroy(blocking: Boolean) {

assertValid()

_isValid= false_destroySite=Utils.getCallSite().shortForm

logInfo("Destroying %s (from %s)".format(toString, _destroySite))

doDestroy(blocking)

}

private[spark] def isValid: Boolean ={

_isValid

}

protecteddef getValue(): T

protecteddef doUnpersist(blocking: Boolean)

protecteddef doDestroy(blocking: Boolean)

protecteddef assertValid() {if (!_isValid) {throw new SparkException("Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))

}

}

override def toString: String= "Broadcast(" + id + ")"}

其中隐含的概念:

1)broadcast的定義必須在Driver端,不能再executor端定義;

2)調用unpersist(),unpersist(boolean blocking),destroy(),distroy(boolean blocking)方法這些方法必須在driver端調用。

3)在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。

Broadcast工作流程

spark廣播變量的原理_Spark2.3(四十三):Spark Broadcast總結,【Spark篇】---Spark中廣播變量和累加器...

Broadcast使用時注意事項

注意事項

1、能不能将一個RDD使用廣播變量廣播出去?

不能,因為RDD是不存儲資料的。可以将RDD的結果廣播出去。

2、廣播變量隻能在Driver端定義,不能在Executor端定義。

3、在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。

4、如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

5、如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中隻有一份Driver端的變量副本。