天天看點

Spark程式設計指南之八:共享變量、廣播變量、累加器

文章目錄

    • 從Spark官網閉包說起
    • 共享變量
      • 示例圖
      • 廣播變量(針對隻讀變量)Broadcast Variables
        • 廣播變量有什麼作用?
      • 累加器(隻能在Driver端擷取)Accumulator

從Spark官網閉包說起

Spark官網Understanding closures(閉包)部分指出,Spark的一個難點在于了解變量和方法的範圍和生命周期。

//使用foreach()計算
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
           

以上代碼将不會正确計算出counter值。Spark會将counter的副本發送到每個executor上,對counter副本的修改不會改變Driver端的counter,counter的值仍然是0。

在這種計數或求和的場景可以使用累加器(Accumulator),下面将詳細說明。

參考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-

共享變量

Spark計算傳遞的函數是在遠端節點上執行的,函數中使用的是變量的副本。這些變量被複制到每台機器上,變量的更新不會傳回到Driver端。

示例圖

Spark程式計算的時候,函數是分布到每一個節點去執行,所有的變量副本會傳給節點計算,但是計算完成後,變量的更新不會傳回到driver端。如下圖:

代碼示例:

val rdd = sc. textFile("README.md")  //下文中rdd預設使用此值,README.md為Spark源碼中的文檔
val count = rdd.count()
var i = 0
val mapRDD = rdd.map(line => {
  i = i + 1
  (line,line.size) 
})
println("i:" + i)
println("count:" + count)
           

i:0

count:103

對于跨叢集跨任務的變量共享,Spark提供了兩種方式:廣播變量和累加器。

廣播變量(針對隻讀變量)Broadcast Variables

核心:将隻讀變量緩存在每個worker節點上。

一:每個節點隻要傳輸一次給Executor,可以給多個Task使用。

二:節點上的變量副本并不都來自與Driver,而是采用了高效廣播算法(TorrentBroadcast)降低通信成本。

廣播變量有什麼作用?

用作Join時的調優map-side join

Join操作 ——》 Shuffle操作 ——》 性能問題

如果Join兩邊的RDD,有一個RDD的數量較小,可以将小的RDD資料廣播出去,将Join轉化為map-side join

注意事項: RDD是不能直接廣播的。要廣播資料,而RDD要遇到Action操作才會執行。

直接廣播RDD會提示"Can not directly broadcast RDDs; instead, call collect() and broadcast the result."

val rddData = rdd.collectAsMap()
val rddBC = sc.broadcast(rddData)
val rdd3 = rdd2.mapPartitions(partition => {
	val bc = rddBC.value
	for{
		(key,value) <- partition
		if(rddData.contains(key))
	}yield(key, (bc.get(key).getOrElse(""),value))
})
           

累加器(隻能在Driver端擷取)Accumulator

累加器用于Spark任務的計數和求和場景。

累加器隻能累加,擷取隻能在driver端,節點是不知道累加器的值的。

累加器的使用需謹慎,如以下示例:

val acc = sc.longAccumulator("counterAcc")
val mapRDD = rdd.map(line => {
	acc.add(1)
	(line, line.size)
})
val count = mapRDD.count()
val count2 = mapRDD.count()
println("acc.value)
           

mapRDD.count()兩次Action會導緻累加器的值翻倍。上述情況使用緩存可以防止acc.value的值翻倍。但實際情況中不推薦使用累加器。

  1. 執行多個action會出錯。
  2. 雖然可以緩存,但是緩存可能被清除。
  3. 累加器僅用于程式調試。

繼續閱讀