了解Spark的閉包
對 Spark 來說, 最困難的事情之一就是在跨叢集執行代碼時了解變量和方法的範圍和生命周期。RDD 在其範圍之外修改變量的操作可能會經常引起混淆。在下面的示例中, 我們将檢視使用 foreach () 遞增計數器的代碼, 但其他操作也可能發生類似的問題。
舉例
考慮下面簡單的RDD元素和,它的行為可能會有所不同,這取決于是否在同一個 JVM 中執行。這方面的一個常見示例是在本地模式下運作 spark (--master=local[n])與将 spark 應用程式部署到叢集(例如,通過 spark-submit 送出到yarn):
var counter = 0
var rdd = sc.parallelize(data)
// 這是錯誤的做法
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
看看scala的舉例

本地模式與叢集模式
上述代碼的行為未定義, 可能無法按預期方式工作。為了執行作業, Spark 會将 RDD 操作的處理分解為任務, 每一個都由一個Executor 完成。在執行之前,Spark 計算任務的閉包。閉包是那些變量和方法必須是可見的執行者在 RDD 上進行計算 (在本例中為 foreach ())。此閉包被序列化并發送到每個 executor上。
counter 時,它不再是驅動器節點上的計數器。 driver 程式節點的記憶體中仍有一個counter,但 executor 程式對此不再可見! executor 隻能看到序列化閉包的副本。 是以,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包内的值。
在本地模式下,在某些情況下,foreach 函數實際上将在與 drive r程式相同的 JVM 内執行,并且會引用相同的原始 counter ,并可能實際更新它。
為了確定在這些場景中明确定義的行為,應該使用 Accumulator。 Spark中的 Accumulator專門用于提供一種機制,用于在叢集中的工作節點之間執行拆分時安全地更新變量。 本指南的 Accumulator部分更詳細地讨論了這些内容。
一般來說,閉包結構像循環或本地定義的方法,不應該被用來改變一些全局狀态。 Spark 并沒有定義或保證從封閉外引用的對象的突變行為。 這樣做的一些代碼可以在本地模式下工作,但這隻是偶然,并且這種代碼在分布式模式下的行為不如預期。 如果需要某些全局聚合,請改用累加器。
列印 RDD 的元素
另一個常見的習慣用法是嘗試使用 rdd.foreach(println)或rdd.map(println)列印RDD的元素。 在單台機器上,這将生成預期的輸出并列印所有RDD的元素。 但是,在叢集模式下,由executor程式調用的标準輸出現在寫入執行程式的stdout,而不是driver程式的标準輸出,是以driver程式的stdout不會顯示這些! 要列印driver程式中的所有元素,可以使用collect()方法首先将RDD帶到驅動程式節點:rdd.collect()。foreach(println)。 但是,這可能會導緻driver程式記憶體不足,因為collect()會将整個RDD提取到單台計算機; 如果您隻需要列印RDD的幾個元素,則更安全的方法是使用take():rdd.take(100).foreach(println)。
總結
1、Spark 閉包的大緻作用就是:函數可以通路函數外面的變量,但是函數内對變量的修改,在函數外是不可見的。
2、Scala 閉包和 Spark 閉包不一樣
參考
- http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
- https://github.com/highfei2011/SparkDeepDoc