天天看点

官网翻译之RDD Programming Guide-Scala

Overview(概述)

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

从高一点的层次上看,每个Spark应用程序都由一个驱动程序组成,该驱动程序运行用户的主要功能,并在集群上执行各种并行操作。Spark提供的主要抽象概念是弹性分布式数据集(RDD),它是被切分后分布在集群中可以被平行操作的各个节点之间的数据元素的集合。RDD是通过从Hadoop文件系统中的文件(或任何其他Hadoop支持的文件系统)或驱动程序中的现有Scala集合开始并转换来创建的。用户还可以要求Spark在内存中存储一个RDD,允许它在并行操作中被有效地重用。最后,RDDS可以从节点故障中自动恢复。

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Spark中的第二个抽象概念是可用于并行操作的共享变量。默认情况下,当Spark在不同的节点上并行运行函数作为一组任务时,它将函数中使用的每个变量的副本传送到每个任务。有时候,一个变量需要跨任务共享,或者在任务和驱动程序之间共享。Spark支持两种类型的共享变量:广播变量和累加器,广播变量可以在所有节点上的内存中缓存一个值,累加器是仅“添加”的变量,如计数器和求和。

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

本指南展示Spark支持的语言中的每一个特性。如果启动Spark的交互式shell——Scala shell的bin/spark-shell或Python的bin/pyspark,则很容易跟随。

Linking with Spark(和Spark的链接)

Spark 2.4.0 is built and distributed to work with Scala 2.11 by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).

默认情况下Spark2.4.0可以和Scala 2.11一起构建并工作。(Spark也可以和Scala的其他版本一起构建并工作。)在Scala中编写应用程序,您将需要使用一个兼容的Scala版本(例如2.11。x)。

To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:

要编写Spark应用程序,需要在Spark上添加Maven依赖项。Spark可通过Maven Central提供:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.4.0
           

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.

此外,如果希望访问HDFS集群,则需要为HDFS版本添加Hadoop客户端的依赖项。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
           

Finally, you need to import some Spark classes into your program. Add the following lines:

最后,您需要导入一些Spark类到程序中。添加以下几行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
           

(Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._ to enable essential implicit conversions.)

(在SCAP1.3.0之前,您需要显式导org.apache.spark.SparkContext._以启用必要的隐式转换。)

Initializing Spark(初始化Spark)

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

Spark程序必须做的第一件事是创建一个SparkContext对象,通过它可以访问一个集群。要创建SparkContext,首先需要构建包含有关应用程序的信息的SparkConf对象。

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

在一个JVM中只有一sparkcontext 可能是激活状态。在你主动创造一个新的SparkContext之前,你必须调用stop()方法停止sparkcontext 。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
           

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

appName参数是应用程序在群集UI上显示的名称。master是Spark、Mesos或YARN集群的URL,或是在本地模式下运行的特殊“local”字符串。实际上,当在集群上运行时,您不想在程序中硬编码master,而是使用spark-submit启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以将master设置为“local”来运行Spark进程。

Using the Shell(使用shell工具)

in the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the --master argument, and you can add JARs to the classpath by passing a comma-separated list to the --jars argument. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the --repositories argument. For example, to run bin/spark-shell on exactly four cores, use:

在SsparkShell中,一个特殊的SparkContext已经为你创建好了,这个变量名字是sc。自己创建的SparkContext将不会生效。你可以通过–master argument设置上下文连接到哪个主机,并通过向–jars参数传递逗号分隔的列表,将JAR添加到类路径。您还可以通过向–packages参数提供逗号分隔的Maven坐标列表来向shell会话添加依赖项(例如,Spark Packages)。任何可能存在依赖关系的附加库(例如Sonatype)都可以传递给-repositories参数。例如,在四个内核上运行bin/spark-shell外壳,使用:

$ ./bin/spark-shell --master local[4]
           

Or, to also add code.jar to its classpath, use:(或者添加code.jar 到路径上)

$ ./bin/spark-shell --master local[4] --jars code.jar
           

To include a dependency using Maven coordinates:(添加maven依赖)

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
           

For a complete list of options, run spark-shell --help. Behind the scenes, spark-shell invokes the more general spark-submit script.

有关选项的完整列表,请运行Shell Shell --help。在后台,Spark-Shell 执行了更常规的spark-submit脚本。

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。创建RDD有两种方法:并行化驱动程序中的现有集合,或者引用外部存储系统中数据集,比如共享文件系统、HDFS、HBase或提供支持Hadoop InputFormat的任何数据源。

Parallelized Collections(并行集合)

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

并行集合是通过在驱动程序(Scala Seq)中的现有集合上调用SparkContext的parallelize方法创建的。集合中的元素被复制以形成可并行操作的分布式数据集。例如,这里是如何创建一个并行的集合,保持数字1到5:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
           

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) => a + b) to add up the elements of the array. We describe operations on distributed datasets later on.

一旦创建,分布式数据集(distData)可以被并行操作。例如,我们可以调用distData.reduce((a, b) => a + b)来实现将数组的元素相加。我们稍后描述分布式数据集上的操作。

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

并行集合的一个重要参数是将数据集切割成分区的数量。Spark将为集群中的每个分区运行一个任务。通常,对于集群中的每个CPU,都需要2-4个分区。通常,Spark会根据您的群集自动设置分区的数量。但是,您也可以通过设置parallelize的第二个参数手动设置(例如,sc.parallelize(data, 10))。注意:代码中的某些地方使用术语“切片”(分区的同义词)来保持向后兼容性。

External Datasets(外部数据集)

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、SequenceFiles和任何其他Hadoop输入格式。

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:

可以使用SparkContext的textFile方法创建文本文件RDDS。此方法获取文件的URI(机器上的本地路径或hdfs://, s3a://, etc URI),并将其作为行的集合读取。下面是一个示例调用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
           

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(s => s.length).reduce((a, b) => a + b).

一旦创建,distFile可以通过dataset操作来操作。例如,我们可以使用map和reduce操作将所有行的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。

Some notes on reading files with Spark:(阅读Spark文件的几点提示)

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

如果使用本地文件系统上的路径,文件也必须在工作节点上可以被访问。要么将文件复制到所有工作节点,要么使用网络共享文件系统。

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

Spark的所有基于文件的输入方法,包括textFile、支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz")。

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

textFile方法还采用一个可选的第二个参数来控制文件分区的数量。默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB的块),但是您也可以通过传递更大的值来请求更多数量的分区。请注意,分区数不能比块少。

Apart from text files, Spark’s Scala API also supports several other data formats:

除了文本文件之外,Scala的Scala API还支持其他几种数据格式:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.

SparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名、内容)对返回。这与textFile相反,它将把文件中每行作为一条记录返回。分区是由数据的位置决定的,在某些情况下,可能导致太少的分区。对于这些情况,wholeTextFiles提供了一个可选的第二个参数,用于控制最小数量的分区。

For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically read IntWritables and Texts.

对于SequenceFiles,使用SparkContext的sequenceFile[k,v]方法,其中k和v是文件中的键值和值类型。这些应该是Hadoop的Writable接口的子类,像是IntWritable和Text的。此外,Spark允许您为几个常见的Writable指定本机类型;例如,sequenceFile[Int,String]将自动读取IntWritables和Texts。

For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).

对于其他Hadoop InputFormats,可以使用SparkContext.hadoopRDD方法,该方法接受任意JobConf和输入格式类、键类和值类。像为Hadoop作业设置输入源相同的方式设置他们。您还可以使用SparkContext.newAPIHadoopRDD处理和新的 MapReduce(org.apache.hadoop.mapreduce) API相同格式的数据

RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

RDD.saveAsObjectFile和SparkContext.objectFile支持像构成Java对象的序列化一样的方式保存RDD。虽然这不如像AVRO这样的专门格式高效,但它提供了一种简单的方法来保存任何RDD。

RDD Operations(RDD的操作)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

RDD支持两种类型的操作:转换和操作,转换从现有数据集创建新数据集,操作在数据集上运行计算后向驱动程序返回值。例如,map是一个转换,它通过函数传递每个数据集元素并返回表示结果的新RDD。另一方面,reduce使用一些函数聚合RDD的所有元素并将最终结果返回给驱动程序(尽管也有一个并行的reduceByKey返回分布式数据集)。

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

Spark中的所有转换都是懒惰的,因为它们并不立即计算它们的结果。相反,他们只记得在基本数据集上应用一些(例如文件)转换。只有当一个action需要返回驱动程序的结果时才计算这些转换。这种设计使Spark能够更有效地运行。例如,我们可以看到,通过map创建的数据集将在reduce过后,只将reduce的结果返回给驱动程序,而不是更大的映射数据集。

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

默认情况下,每次转换RDD都可能在每次运行RDD时重新计算。但是,您也可以使用persist(或cache)方法在内存中持久化RDD,在这种情况下,Spark将保留集群上的元素,以便在下次查询时更快地访问它们。还支持在磁盘上持久化RDDS,或者在多个节点上复制。

Basics

To illustrate RDD basics, consider the simple program below:(想要列出RDD的基础操作,可以考虑下面的简单程序)

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
           

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

第一行定义了一个基于外部文件的RDD。此数据集未加载到内存中或以其他方式进行操作:lines仅是指向文件的指针。第二行定义了一个变量lineLengths作为map转换的结果。同样,由于懒惰,lineLengths没有立即计算出来。最后,我们执行reduce,这是一个action。此时,Spark将计算分解为要在单独的机器上运行的任务,并且每台机器都运行其map部分并且本地执行reduce,只向驱动程序返回其结果。

If we also wanted to use lineLengths again later, we could add:

(如果我们想要稍后再次使用lineLengths,我们可添加:)

lineLengths.persist()
           

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

在reduce之前(添加),这样在lineLengths第一次计算后,就会将结果存入内存中。

Passing Functions to Spark(向Spark传递函数)

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

Spark的API严重依赖于在集群上运行的驱动程序中的传递函数。有两种推荐方法可以做到这一点:

Anonymous function syntax, which can be used for short pieces of code.

匿名函数语法,可用于短代码。

Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

全局单体对象中的静态方法。例如,您可以定义对象MyFunctions,然后像下面这样传递MyFunctions.func1.:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
           

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

注意,虽然也可以在类方法(与单例对象相反)中传递方法的引用,但是这需要传递包含该类的对象以及方法。例如,考虑:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
           

Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).

这里,如果我们创建了一个新的MyClass实例并在其上调用doStuff方法,那么其中的map引用了MyClass实例的func1方法,因此需要将整个对象发送到集群。它类似于编写rdd.map(x => this.func1(x))。

In a similar way, accessing fields of the outer object will reference the whole object:

以类似的方式,访问外部对象的字段将引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
           

is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

相当于编写rdd.map(x => this.field + x),引用了整个对象。为了避免这个问题,最简单的方法是将字段复制到局部变量中,而不是从外部访问它。

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
           
Understanding closures(理解闭包)

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

关于Spark比较困难的一个地方是理解在一个集群中执行代码时变量和方法的范围和生命周期。在其范围之外修改变量的RDD操作可能是混淆的常见来源。在下面的示例中,我们将看到使用foreach()来实现递增计数器,但是同样的问题其他操作也可能出现。

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

考虑下面的本地RDD元素求和问题,根据是否在同一JVM内执行不同,其行为可能有所不同。一个常见的例子是在本地模式下运行Spark(–master=local[n])与将Spark应用程序部署到集群(例如,通过spark-submit到YARN)时:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
           

Local vs. cluster modes(本地模式vs集群模式)

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spasrk将RDD操作的处理分解为任务,每个任务由执行器执行。在执行之前,Spark计算任务的闭包。闭包是那些可以被executor用来执行RDD上的计算的变量和方法 (这个例子中是 foreach()). 此闭包被序列化并发送给每个执行器。

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

发送到每个执行器的闭包中的变量现在是副本,因此,当在foreach函数中引用计数器时,它不再是驱动程序节点上的计数器。在驱动节点的内存中还有一个计数器,但是对执行器来说不再可见了!执行器只能从序列化的闭包中看到副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用序列化闭包内的值。

In local mode, in some circumstances, the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.

在本地模式下,在某些情况下,foreach函数将在与驱动程序相同的JVM中执行,并且将引用相同的原始计数器,也许可以更新它(counter)。

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

为了确保在这些场景中定义良好的行为,人们应该使用累加器。Spark中的累加器专门用于提供一种机制,用于在集群中的工作节点之间拆分执行时安全地更新变量。本指南的累加器部分更详细地讨论了这些问题。

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

通常,闭包,如循环或本地定义的方法,不应该用来改变一些全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。一些代码可以在本地模式下工作,但这只是偶然的,而且这种代码在分布式模式下不会像预期的那样工作。如果需要一些全局聚合,则使用累加器。

Printing elements of an RDD(打印RDD的元素)

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

另一个常见的习惯用法是尝试使用 rdd.foreach(println)或rdd.map(println)打印RDD的元素。在单个机器上,这将生成预期输出并打印所有RDD元素。但是,在集群模式下,执行器调用到stdout的输出现在写入执行器的stdout,而不是写入驱动程序上的stdout,因此驱动程序上的stdout不会显示这些输出!要打印驱动程序上的所有元素,可以使用collect()方法首先将RDD带到驱动程序节点,因此:rdd.collect().foreach(println)。但是,这可能导致驱动程序的内存耗尽,因为collect()将整个集群中RDD获取到一台机器;如果只需要打印RDD的几个元素,更安全的方法是使用take():rdd.take(100 foreach).(println)。

Working with Key-Value Pairs(使用key-value 对儿)

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.

虽然大多数Spark操作可以在包含任何类型的对象的RDD上工作,但是只有少数特殊操作在键-值对的RDD上可用。最常见的是分布式的“洗牌(shuffle)”操作,例如通过key分组或聚合元素。

In Scala, these operations are automatically available on RDDs containing Tuple2 objects (the built-in tuples in the language, created by simply writing (a, b)). The key-value pair operations are available in the PairRDDFunctions class, which automatically wraps around an RDD of tuples.

在Scala中,这些操作在包含Tuple2对象的RDD上自动可用(Scala语言中的内置元组,通过简单地编写(a,b)创建)。键值对操作可在PairRDDFunctions类中使用,它自动对一组元组的RDD进行封装。

For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:

例如,下面的代码对键-值对使用reduceByKey操作来计算文件中每行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
           

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.

例如,我们还可以使用counts.sortByKey()按字母顺序对这些对进行排序,最后使用counts.collect()将它们作为对象数组带回驱动程序。

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.

注意:当在键-值对操作中使用自定义对象作为键时,必须确保自定义equals()方法与匹配的hashCode()方法一起使用。有关详细信息,请参见Object.hashCode()文档中的描述。

Transformations(转化)

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.

下表列出了Spark支持的一些常见转换。有关详细信息,请参考RDD API DOC(Scala、Java、Python、R)和配对RDD函数DOC(Scala,Java)。

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.

下表列出了Spark支持的一些常见操作(Actions)。参考RDD API DOC(Scala,Java,Python,R)和配对RDD函数DOC(Scala,Java)以获取详细信息。

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

Spark RDD API还公开了一些异操作的步版本,比如foreachAsync for foreach,它立即向调用者返回FutureAction,而不是阻塞等待操作完成。这可以用来管理或等待异步执行操作。

Shuffle operations(洗牌操作)

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Spark的某些操作触发称为洗牌的事件。Sffffle是Spark的重新分配数据的机制,以便在分区上对其进行分组。这通常涉及executors和机器数据的复制,因此洗牌是复杂且耗性能的操作。

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

为了理解在洗牌过程中会发生什么,我们可以考虑reduceByKey操作的例子。reduceByKey操作生成一个新的RDD,其中将单个key的所有值组合成一个元组——key和对与该key相关联的所有值执行reduce函数的结果。挑战在于,对于单个key,并非所有值都必须驻留在相同的分区上,甚至驻留在相同的机器上,但是它们必须位于同一位置才能计算结果。

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

在Spark中,数据通常不分布在分区上,以便在特定的操作中处于必要的位置。在计算期间,单个任务将在单个分区上操作——因此,为了组织要执行的单个reduceByKey reduce任务的所有数据,Spark需要执行多对多的操作。它必须从所有分区读取以找到所有键的所有值,然后跨分区汇集值以计算每个键的最终结果——这称为洗牌。

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

虽然新改组数据的每个分区中的元素集是确定性的,分区本身的排序也是确定性的,但是这些元素的排序不是。如果一个人希望在shuffle之后有可预测的有序数据,那么就有可能使用:

mapPartitions to sort each partition using, for example, .sorted(对每个分区进行排序.例如,使用sorted。)

repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning(有效地分区同时重新分配

)

sortBy to make a globally ordered RDD(生成全局有序RDD)

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

可能导致shuffle的操作包括重新分区操作如repartition和coalesce、‘ByKey operations(除了计数)如groupByKey和reduceByKey,以及连接操作(如cogroup和join)。

Performance Impact(性能影响)

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织用于shffle的数据,Spark生成任务集-map任务集来组织数据,以及一组reduce任务集来聚集数据。这个术语来自MapReduce,并不直接涉及Spark的map和reduce操作。

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

在内部,单个map任务的结果保存在内存中,直到它们不能匹配(找不到匹配的数据,即转换结束)为止。然后,基于目标分区对这些文件进行排序,并将其写入单个文件。reduce任务读取相关的排完序的数据块。

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

某些Shuffle操作会消耗大量堆内存,因为它们在传输记录之前或之后使用内存中的数据结构来组织记录。具体而言,reduceByKey和aggregateByKey在map一端创建这些结构,而“ByKey操作在reduce一端生成这些结构”。当数据不适合内存时,Spark会将这些表溢出到磁盘,从而引起磁盘I/O的额外开销和增加的垃圾收集。

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Shuffle还可以在磁盘上生成大量的中间文件。对于Spark 1.3,这些文件被保存,直到不再使用相应的RDDS并被垃圾回收机制回收。这样做了,如果重新计算血统,则不需要重新创建洗牌文件。垃圾收集可能只在长时间之后发生,如果应用程序保留对这些RDD的引用,或者如果GC启动不够频繁。这意味着长时间运行的Spark作业可能消耗大量的磁盘空间。临时存储目录由配置Spark时spark.local.dir配置参数指定。

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

可以通过调整各种配置参数来调整Shuffle行为。请参Spark配置指南中的“Shuffle Behavior”部分。

RDD Persistence(RDD的持久化)

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

Spark中最重要的功能之一是在内存中保存操作后的(或缓存)数据集。当持久化RDD时,每个节点都将其计算的任何分区存储在内存中,并在数据集(或从中派生的数据集)上的其他操作中重用它们。这允许未来的行动要快得多(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

可以使用它的persist()或cache()方法标记RDD可以被缓存。第一次在一个action中计算过后,它将被保存在节点上的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

此外,每个持久化RDD可以使用不同的存储级别来存储,例如,允许您将数据集保存在磁盘上,将其保存在内存中,但使用序列化的Java对象的形式(以节省空间),将其复制到节点上。这些级别是通过传递StorageLevel对象(Scala、Java、Python)persist()来设置的。cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化对象)。全套存储级别为:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.

注意:在Python中,存储的对象总是用Pickle库进行序列化,所以是否选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_.DISK、MEMORY._DISK_2、DISK_ONLY和DISK_ONLY_2。

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Spark还会自动保存shuffle操作过程中一些中间数据(例如,reduceByKey)中,即使没有用户调用persist。这样做是为了避免一个节点在shuffle过程中失败时需要重新计算整个输入。如果打算重用RDD,仍然建议用户在所需的RDD上调用持久性。

Which Storage Level to Choose?(怎么选择存储级别)

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

Spark的存储级别是为了在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个:

If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

如果您的RDDS与默认存储级别(MeimyIyOnLead)适配,那么就把它们保留下来。这是CPU效率最高的选项,允许RDDS上的操作尽可能快地运行。

If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库来使对象更加节省空间,但是访问速度仍然相当快。(Java and Scala)

Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

不要持久化到磁盘,除非计算数据集的函数是昂贵的,或者它们会过滤大量数据。否则,重新计算分区可能与从磁盘读取一样快。

Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

如果需要快速故障恢复(例如,如果使用Spark来服务Web应用程序的请求),则使用复制的存储级别。所有的存储级别都通过重新计算丢失的数据提供充分的容错性。但复制的让你继续运行RDD上任务而不用等待重新计算丢失的分区。

Removing Data(移除数据)

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Spark自动监视每个节点上的缓存使用,并以最近最少使用的(LRU)方式丢弃旧数据分区。如果您想手动删除RDD,而不是等待它从缓存中掉出来,请使用RDD.unpersist()方法。

Shared Variables(共享变量)

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

通常,当在远程集群节点上执行传递给Spark操作(例如map或reduce)的函数时,它处理函数中使用的所有变量的单独副本。这些变量被复制到每台机器,并且远程机器上的变量的更新不会被传播回驱动程序。支持跨任务的通用、读写共享变量将是低效的。然而,Spark确实为两种常见使用模式提供了两种有限类型的共享变量:广播变量和累加器。

Broadcast Variables(广播变量)

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

广播变量允许程序员在每个机器上保存只读变量,而不是用任务来发送它的副本。例如,它们可以以有效的方式给每个节点提供一个大的输入数据集的副本。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Spark的actions是通过一组阶段执行的,由分布式的“shuffle”操作分开。Spark自动广播每个阶段任务所需的公共数据。以这种方式广播的数据以序列化的形式缓存,然后在运行每个任务之前反序列化。这意味着,显式地创建广播变量只有在跨多个阶段的任务需要相同的数据或者以反序列化的形式缓存数据很重要时才有用。

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:

广播变量是通过一个变量v调用SparkContext.broadcast(v)创建的。广播变量是围绕v的包装器,其值可以通过调用value方法来访问。下面的代码显示:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
           

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

在创建广播变量之后,应该在集群上运行的任何函数中使用它,而不是值v,这样v就不会多次被传送到节点。此外,在广播对象v之后,不应该对其进行修改,以确保所有节点获得广播变量的相同值(例如,如果变量稍后被传送到新节点)

Accumulators(累加器)

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

累加器是仅通过关联操作和交换操作被“增加”的变量,因此可以有效地并行支持。它们可以用于实现计数器(如MapReduce)或求和。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。

As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance counter) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.

作为用户,可以创建命名的或未命名的累加器。如下图所示,一个命名累加器(在本例中为counter)将在WebUI中显示用于修改该累加器的阶段。SparK显示在“任务”表中由任务修改的每个累加器的值。

官网翻译之RDD Programming Guide-Scala

Tracking accumulators in the UI can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

跟踪UI中的累加器对于理解运行阶段的进度很有用(注意:Python中还不支持这一点)。

A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

可以通过分别调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()来创建类型为Long或Double的数字累加器。在集群上运行的任务可以使用Add方法添加到该任务中(修改它即+x)。然而,它们不能读取它的值。只有驱动程序可以使用它的值方法读取累加器的值。

The code below shows an accumulator being used to add up the elements of an array array:(下面的代码展示了使用累加器来统计数组中的元素的个数)

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

           

While this code used the built-in support for accumulators of type Long, programmers can also create their own types by subclassing AccumulatorV2. The AccumulatorV2 abstract class has several methods which one has to override: reset for resetting the accumulator to zero, add for adding another value into the accumulator, merge for merging another same-type accumulator into this one. Other methods that must be overridden are contained in the API documentation. For example, supposing we had a MyVector class representing mathematical vectors, we could write:

虽然这个代码使用了对类型为Long的内置的累加器,但是程序员也可以通过创建AccumulatorV2的子类来实现他们自己的类型。AccumulatorV2抽象类有几种方法必须重写:reset用于将累加器重置为零,add用于向累加器添加另一个值,merge用于将另一个相同类型的累加器合并到这个累加器中。必须重写的其他方法包含在API文档中。例如,假设我们有一个表示数学vectors的MyVector类,我们可以写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
           

Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.

注意,当程序员定义他们自己的AccumulatorV2类型时,得到的类型可能与添加的元素的类型不同。

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

对于仅在动作内执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新值。在转换中,用户应该知道如果重新执行任务或作业阶段,每个任务的更新可能应用多次。

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

累加器不改变Spark的惰性评估模型。如果它们在RDD的操作内被更新,则它们的值仅在RDD作为动作的一部分被计算之后被更新。因此,当在像map()这样的懒惰转换中进行时,不能保证累加器更新被执行。下面的代码片段演示了这个属性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
           
Deploying to a Cluster(部署到一个集群)

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

应用程序提交指南描述如何将应用程序提交到集群。简而言之,一旦你把你的应用程序打包成一个JAR(对于Java/Scala)或者一组.py./zip文件(对于Python),bin/spark-submit提交脚本就可以将它提交给任何支持的集群管理器。

Launching Spark jobs from Java / Scala(从Java/Scala启动Spark作业)

The org.apache.spark.launcher package provides classes for launching Spark jobs as child processes using a simple Java API.

org.apache.spark.launcher包提供了相关的类,用于使用简单的java API 将Spark任务作为一个子进程启动

Unit Testing(单元测试)

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local, run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework’s tearDown method, as Spark does not support two contexts running concurrently in the same program.

SCAP对于任何流行的单元测试框架都提供友好的支持。只需在测试中创建一个SparkContext,将主URL设置为local,运行操作,然后调用SparkContext.stop()将其停止。确保在最终块或测试框架的tearDown方法中停止上下文,因为Spark不支持在同一程序中同时运行两个上下文。

Where to Go from Here(接下来可以看哪块)

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:

你可以看到Spark网站上的一些Spark程序。此外,Spark在examples目录(Scala、Java、Python、R)中包括几个示例。您可以通过将类名传递给Spark的 bin/run-example示例脚本来运行Java和Scala示例,例如:

./bin/run-example SparkPi
           

For Python examples, use spark-submit instead:(python的例子请使用spark-submit)

./bin/spark-submit examples/src/main/python/pi.py
           

For R examples, use spark-submit instead:(R语音的例子,请使用spark-submit)

./bin/spark-submit examples/src/main/r/dataframe.R
           

For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.

有关优化程序的帮助,配置和优化指南提供有关最佳实践的信息。它们对于确保数据以有效的格式存储在内存中尤其重要。为了有助于部署,集群模式概述描述了分布式操作中涉及的组件和支持的集群管理器。

Finally, full API documentation is available in Scala, Java, Python and R.

最后,完整的API文档在Scala、Java、Python和R中可以找到。

继续阅读