Understanding closures(閉包)
Spark官網+Wps翻譯
Spark
->Programming Guides
->RDD Programming Guide
->Resilient Distribute Dataset(RDDs)
->RDD Operations
-> Understanding closures
1. 什麼是Spark Closures?用途?
2. RDD怎樣進行全局聚合?
3. 如何列印所有RDD中的元素,在cluster運作模式下?
One of the harder things(一個比較困難的事情) about Spark is understanding the scope and life cycle variables and methods (範圍,生活循環變量,方法) when executing code across a cluster(叢集).RDD operations(RDD操作) that modify variables(修改變量) outside of their scope can be a frequent source of confusion(一個常見資源的混亂).
In the example below we'll look at code that uses foreach() to increment a counter(增加一個計數器), but similar issues can occur for other operations as well.
Example
Consider the native RDD element sum below, which may behave defferently depending on whether execution is happening within the same JVM. A common example of this is when running Spark Local mode (--master = Local[n]) versus(對抗) deploying(分發) a Spark application to a cluster(e.g. via spark-sumbit to YARN)
Scala
var count = 0
var rdd = sc.parallelize(data)
//Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value:" + counter)
Local vs. cluster modes(Local模式和cluste運作模式)
The behavior of the above code is undefined(不明确的), and may not work as intended(如預期). To execute jobs, Spark breaks up(打破了) the processing of RDD operations into tasks, each of which is executed by an executor to perform(執行) its computations(計算指令) on the RDD (in this case foreach()). This closure(閉包) is serialized and sent to each executor. Prior to execution(在執行之前), Spark computes the task's closure. The closure is those variables and methods which must be visible(可見物) for the executor to perform its computations on the RDD (in this case foreach()).This closure is serialized and sent to each executor.
Note:the closure包括變量和方法,對每個executor是可見的,用于執行its computations on the RDD
The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function(在foreach函數中引用), it’s no longer the counter on the driver node(它不再是驅動程式節點上的計數器). There is still a counter in the memory of the driver node but this is no longer visible to the executors(driver節點的記憶體中仍然有一個counter,但是對Executors來說,這不再是visible了!)! The executors only see the copy from the serialized closure(executors隻看到the serialized closure中的copy。). Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.(是以,the final value of counter仍然是零,因為all operations on counter都引用the value within the serialized closure。)
In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.(在local模式下,在某些情況下,foreach函數實際上将作為driver在同一個JVM中執行,并将引用相同的原始counter,并可能實際更新它。)
To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. (為了確定在這類場景中定義良好的行為,應該使用Accumulator。) Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster.(Spark中的Accumulators專門用于提供一種機制,用于在叢集中的worker節點上拆分execution時安全地更新變量。) The Accumulators section of this guide discusses these in more detail.(本指南的 Accumulators部分将更詳細地讨論這些内容。)
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. (一般來說,closures--constructs比如loops或本地定義的方法--不應該被用來改變某種全局狀态。)Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures.(Spark不定義或保證從closures外部引用的對象的突變行為。) Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode.(有些代碼可以在本地模式下工作,但這隻是偶然的,這種代碼在分布式模式下的行為并不像預期的那樣。) Use an Accumulator instead if some global aggregation is needed.(如果需要某種全局聚合,則使用Accumulator)
Printing elements of an RDD
Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). (另一個常見的成語[common idiom]是試圖列印 RDD的元素使用rdd.foreach(println)或rdd.map(println)。)On a single machine, this will generate the expected output and print all the RDD’s elements.(在一台機器上,這将生成預期的輸出并列印所有RDD的元素。) However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! (但是,在cluster模式下,由執行程式調用的stdout輸出現在正在寫入executors程式的stdout,而不是在driver程式上的stdout,是以驅動程式上的stdout不會顯示這些輸出!) To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). (要列印driver程式上的所有元素,您可以使用collect()方法首先将RDD帶到driver節點,例如:rdd.collect().foreach(println)。) This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine;(但是,這會導緻驅動程式耗盡記憶體,因為collect()将整個RDD提取到一台機器上;) if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).(如果您隻需要列印RDD的幾個元素,更安全的方法是使用take():rdd.take(100).foreach(println)。)