文章目錄
-
- 從Spark官網閉包說起
- 共享變量
-
- 示例圖
- 廣播變量(針對隻讀變量)Broadcast Variables
-
- 廣播變量有什麼作用?
- 累加器(隻能在Driver端擷取)Accumulator
從Spark官網閉包說起
Spark官網Understanding closures(閉包)部分指出,Spark的一個難點在于了解變量和方法的範圍和生命周期。
//使用foreach()計算
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
以上代碼将不會正确計算出counter值。Spark會将counter的副本發送到每個executor上,對counter副本的修改不會改變Driver端的counter,counter的值仍然是0。
在這種計數或求和的場景可以使用累加器(Accumulator),下面将詳細說明。
參考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
共享變量
Spark計算傳遞的函數是在遠端節點上執行的,函數中使用的是變量的副本。這些變量被複制到每台機器上,變量的更新不會傳回到Driver端。
示例圖
Spark程式計算的時候,函數是分布到每一個節點去執行,所有的變量副本會傳給節點計算,但是計算完成後,變量的更新不會傳回到driver端。如下圖:
代碼示例:
val rdd = sc. textFile("README.md") //下文中rdd預設使用此值,README.md為Spark源碼中的文檔
val count = rdd.count()
var i = 0
val mapRDD = rdd.map(line => {
i = i + 1
(line,line.size)
})
println("i:" + i)
println("count:" + count)
i:0
count:103
對于跨叢集跨任務的變量共享,Spark提供了兩種方式:廣播變量和累加器。
廣播變量(針對隻讀變量)Broadcast Variables
核心:将隻讀變量緩存在每個worker節點上。
一:每個節點隻要傳輸一次給Executor,可以給多個Task使用。
二:節點上的變量副本并不都來自與Driver,而是采用了高效廣播算法(TorrentBroadcast)降低通信成本。
廣播變量有什麼作用?
用作Join時的調優map-side join
Join操作 ——》 Shuffle操作 ——》 性能問題
如果Join兩邊的RDD,有一個RDD的數量較小,可以将小的RDD資料廣播出去,将Join轉化為map-side join
注意事項: RDD是不能直接廣播的。要廣播資料,而RDD要遇到Action操作才會執行。
直接廣播RDD會提示"Can not directly broadcast RDDs; instead, call collect() and broadcast the result."
val rddData = rdd.collectAsMap()
val rddBC = sc.broadcast(rddData)
val rdd3 = rdd2.mapPartitions(partition => {
val bc = rddBC.value
for{
(key,value) <- partition
if(rddData.contains(key))
}yield(key, (bc.get(key).getOrElse(""),value))
})
累加器(隻能在Driver端擷取)Accumulator
累加器用于Spark任務的計數和求和場景。
累加器隻能累加,擷取隻能在driver端,節點是不知道累加器的值的。
累加器的使用需謹慎,如以下示例:
val acc = sc.longAccumulator("counterAcc")
val mapRDD = rdd.map(line => {
acc.add(1)
(line, line.size)
})
val count = mapRDD.count()
val count2 = mapRDD.count()
println("acc.value)
mapRDD.count()兩次Action會導緻累加器的值翻倍。上述情況使用緩存可以防止acc.value的值翻倍。但實際情況中不推薦使用累加器。
- 執行多個action會出錯。
- 雖然可以緩存,但是緩存可能被清除。
- 累加器僅用于程式調試。