天天看点

[Spark基础]--理解Spark闭包

理解Spark的闭包

对 Spark 来说, 最困难的事情之一就是在跨集群执行代码时了解变量和方法的范围和生命周期。RDD 在其范围之外修改变量的操作可能会经常引起混淆。在下面的示例中, 我们将查看使用 foreach () 递增计数器的代码, 但其他操作也可能发生类似的问题。

举例

    考虑下面简单的RDD元素和,它的行为可能会有所不同,这取决于是否在同一个 JVM 中执行。这方面的一个常见示例是在本地模式下运行 spark (--master=local[n])与将 spark 应用程序部署到集群(例如,通过 spark-submit 提交到yarn):

var counter = 0
var rdd = sc.parallelize(data)

// 这是错误的做法
rdd.foreach(x => counter += x)

println("Counter value: " + counter)      

看看scala的举例

[Spark基础]--理解Spark闭包

本地模式与集群模式

     上述代码的行为未定义, 可能无法按预期方式工作。为了执行作业, Spark 会将 RDD 操作的处理分解为任务, 每一个都由一个Executor 完成。在执行之前,Spark 计算任务的闭包。闭包是那些变量和方法必须是可见的执行者在 RDD 上进行计算 (在本例中为 foreach ())。此闭包被序列化并发送到每个 executor上。

counter 时,它不再是驱动器节点上的计数器。 driver 程序节点的内存中仍有一个counter,但 executor 程序对此不再可见! executor 只能看到序列化闭包的副本。 因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。

      在本地模式下,在某些情况下,foreach 函数实际上将在与 drive r程序相同的 JVM 内执行,并且会引用相同的原始 counter ,并可能实际更新它。

为了确保在这些场景中明确定义的行为,应该使用 ​​Accumulator​​​。 Spark中的 ​​Accumulator​​​专门用于提供一种机制,用于在集群中的工作节点之间执行拆分时安全地更新变量。 本指南的 ​​Accumulator​​部分更详细地讨论了这些内容。

      一般来说,闭包结构像循环或本地定义的方法,不应该被用来改变一些全局状态。 Spark 并没有定义或保证从封闭外引用的对象的突变行为。 这样做的一些代码可以在本地模式下工作,但这只是偶然,并且这种代码在分布式模式下的行为不如预期。 如果需要某些全局聚合,请改用累加器。

打印 RDD 的元素

      另一个常见的习惯用法是尝试使用 rdd.foreach(println)或rdd.map(println)打印RDD的元素。 在单台机器上,这将生成预期的输出并打印所有RDD的元素。 但是,在集群模式下,由executor程序调用的标准输出现在写入执行程序的stdout,而不是driver程序的标准输出,因此driver程序的stdout不会显示这些! 要打印driver程序中的所有元素,可以使用collect()方法首先将RDD带到驱动程序节点:rdd.collect()。foreach(println)。 但是,这可能会导致driver程序内存不足,因为collect()会将整个RDD提取到单台计算机; 如果您只需要打印RDD的几个元素,则更安全的方法是使用take():rdd.take(100).foreach(println)。

总结

1、Spark 闭包的大致作用就是:函数可以访问函数外面的变量,但是函数内对变量的修改,在函数外是不可见的。

2、Scala 闭包和 Spark 闭包不一样

参考

  1. ​​http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-​​
  2. ​​https://github.com/highfei2011/SparkDeepDoc​​

继续阅读