天天看點

spark api之一:Spark官方文檔 - 中文翻譯

1 概述(Overview)

總體來講,每一個Spark驅動程式應用都由一個驅動程式組成,該驅動程式包含一個由使用者編寫的main方法,該方法會在叢集上并行執行一些列并行計算操作。Spark最重要的一個概念是彈性分布式資料集,簡稱RDD(resilient distributed dataset )。RDD是一個資料容器,它将分布在叢集上各個節點上的資料抽象為一個資料集,并且RDD能夠進行一系列的并行計算操作。可以将RDD了解為一個分布式的List,該List的資料為分布在各個節點上的資料。RDD通過讀取Hadoop檔案系統中的一個檔案進行建立,也可以由一個RDD經過轉換得到。使用者也可以将RDD緩存至記憶體,進而高效的處理RDD,提高計算效率。另外,RDD有良好的容錯機制。

Spark另外一個重要的概念是共享變量(shared variables)。在并行計算時,可以友善的使用共享變量。在預設情況下,執行Spark任務時會在多個節點上并行執行多個task,Spark将每個變量的副本分發給各個task。在一些場景下,需要一個能夠在各個task間共享的變量。Spark支援兩種類型的共享變量:

  • 廣播變量(broadcast variables):将一個隻讀變量緩存到叢集的每個節點上。例如,将一份資料的隻讀緩存分發到每個節點。
  • 累加變量(accumulators):隻允許add操作,用于計數、求和。

2 引入Spark(Linking with Spark)

在Spark 1.6.0上編寫應用程式,支援使用Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使用Java 8,支援lambda表達式(lambda expressions)。

在編寫Spark應用時,需要在Maven依賴中添加Spark,Spark的Maven Central為:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0      

另外,如果Spark應用中需要通路HDFS叢集,則需要在hadoop-client中添加對應版本的HDFS依賴:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>      

最後,需要在程式中添加Spark類。代碼如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf      

(在Spark 1.3.0之前的版本,使用Scala語言編寫Spark應用程式時,需要添加​

​import org.apache.spark.SparkContext._​

​來啟用必要的隐式轉換)

3 初始化Spark(Initializing Spark)

使用Scala編寫Spark程式的需要做的第一件事就是建立一個SparkContext對象(使用Java語言時建立JavaSparkContext)。SparkContext對象指定了Spark應用通路叢集的方式。建立SparkContext需要先建立一個SparkConf對象,SparkConf對象包含了Spark應用的一些列資訊。代碼如下:

  • Scala
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)      
  • java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);      

appName參數為應用程式在叢集的UI上顯示的名字。master為Spark、Mesos、YARN URL或local。使用local值時,表示在本地模式下運作程式。應用程式的執行模型也可以在使用​

​spark-submit​

​指令送出任務時進行指定。

3.1 使用Spark Shell(Using the Shell)

在Spark Shell下,一個特殊的SparkContext對象已經幫使用者建立好,變量為sc。使用參數​

​--master​

​設定master參數值,使用參數​

​--jars​

​設定依賴包,多個jar包使用逗号分隔。可以使用​

​--packages​

​參數指定Maven坐标來添加依賴包,多個坐标使用逗号分隔。可以使用參數​

​--repositories​

​添加外部的repository。示例如下:

  • 本地模式下,使用4個核運作Spark程式:
$ ./bin/spark-shell --master local[4]      
  • 将code.jar包添加到classpath:
$ ./bin/spark-shell --master local[4] --jars code.jar      
  • 使用Maven坐标添加一個依賴:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"      

詳細的Spark Shell參數描述請執行指令​

​spark-shell --help​

​。更多的spark-submit腳本請見​​spark-submit script​​。

4 彈性分布式資料集(RDDs)

Spark最重要的一個概念就是RDD,RDD是一個有容錯機制的元素容器,它可以進行并行運算操作。得到RDD的方式有兩個:

  • 通過并行化驅動程式中已有的一個集合而獲得
  • 通過外部存儲系統(例如共享的檔案系統、HDFS、HBase等)的資料集進行建立

4.1 并行集合(Parallelized Collections)

在驅動程式中,在一個已經存在的集合上(例如一個Scala的Seq)調用SparkContext的parallelize方法可以建立一個并行集合。集合裡的元素将被複制到一個可被并行操作的分布式資料集中。下面為并行化一個儲存數字1到5的集合示例:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)      
  • Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);      

當分布式資料集建立之後,就可以進行并行操作。例如,可以調用方法​

​distData.reduce((a,b) => a + b)​

​求數組内元素的和。Spark支援的分布式資料集上的操作将在後面章節中較長的描述。

并行集合的一個重要的參數是表示将資料劃分為幾個分區(partition)的分區數。Spark将在叢集上每個資料分區上啟動一個task。通常情況下,你可以在叢集上為每個CPU設定2-4個分區。一般情況下,Spark基于叢集自動設定分區數目。也可以手動進行設定,設定該參數需要将參數值作為第二參數傳給parallelize方法,例如:​

​sc.parallelize(data, 10)​

​。注意:在代碼中,部分位置使用術語slices(而不是partition),這麼做的原因是為了保持版本的向後相容性。

4.2 外部資料庫(External Datasets)

Spark可以通過Hadoop支援的外部資料源建立分布式資料集,Hadoop支援的資料源有本地檔案系統、HDFS、Cassandra、HBase、​​Amazon S3​​、Spark支援的文本檔案、​​SequenceFiles​​、Hadoop ​​InputFormat​​。

SparkContext的testFile方法可以建立文本檔案RDD。使用這個方法需要傳遞文本檔案的URI,URI可以為本機檔案路徑、hdfs://、s3n://等。該方法讀取文本檔案的每一行至容器中。示例如下:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08      
JavaRDD<String> distFile = sc.textFile("data.txt");      

建立之後,distFile就可以進行資料集的通用操作。例如,使用map和reduce操作計算所有行的長度的總和:​

​distFile.map(s => s.length).reduce((a, b) => a + b)​

​。

使用Spark讀取檔案需要注意一下幾點:

  • 程式中如果使用到本地檔案路徑,在其它worker節點上該檔案必須在同一目錄,并有通路權限。在這種情況下,可以将檔案複制到所有的worker節點,也可以使用網絡内的共享檔案系統。
  • Spark所有的基于檔案輸入的方法(包括​

    ​textFile​

    ​),都支援檔案夾、壓縮檔案、通配符。例如:​

    ​textFile("/my/directory")​

    ​、​

    ​textFile("/my/directory/*.txt")​

    ​textFile("/my/directory/*.gz")​

  • textFile方法提供了一個可選的第二參數,用于控制檔案的分區數。預設情況下,Spark為檔案的每個塊建立一個分區(塊使用HDFS的預設值64MB),通過設定這個第二參數可以修改這個預設值。需要注意的是,分區數不能小于塊數。

除了文本檔案之外,Spark還支援其它的資料格式:

  • ​SparkContext.wholeTextFiles​

    ​能夠讀取指定目錄下的許多小文本檔案,傳回(filename,content)對。而textFile隻能讀取一個文本檔案,傳回該文本檔案的每一行。
  • 對于​​SequenceFiles​​可以使用SparkContext的​

    ​sequenceFile[K,V]​

    ​方法,其中K是檔案中key和value的類型。它們必須為像IntWritable和Text那樣,是Hadoop的Writable接口的子類。另外,對于通用的Writable,Spark允許使用者指定原生類型。例如,​

    ​sequenceFile[Int,String]​

    ​将自動讀取IntWritable和Text。
  • 對于其他Hadoop InputFormat,可以使用​

    ​SparkContext.hadoopRDD​

    ​方法,該方法接收任意類型的JobConf和輸入格式類、鍵類型和值類型。可以像設定Hadoop job那樣設定輸入源。對于InputFormat還可以使用基于新版本MapReduce API(​

    ​org.apache.hadoop.mapreduce​

    ​)的​

    ​SparkContext.newAPIHadoopRDD​

    ​。(老版本接口為:​

    ​SparkContext.newHadoopRDD​

    ​)
  • ​RDD.saveAsObjectFile​

    ​和​

    ​SparkContext.objectFile​

    ​能夠儲存包含簡單的序列化Java對象的RDD。但是這個方法不如Avro高效,Avro能夠友善的儲存任何RDD。

4.3 RDD操作(RDD Operations)

RDD支援兩種類型的操作:

  • transformation:從一個RDD轉換為一個新的RDD。
  • action:基于一個資料集進行運算,并傳回RDD。

例如,map是一個transformation操作,map将資料集的每一個元素按指定的函數轉換為一個RDD傳回。reduce是一個action操作,reduce将RDD的所有元素按指定的函數進行聚合并傳回結果給驅動程式(還有一個并行的reduceByKey能夠傳回一個分布式的資料集)。

Spark的所有transformation操作都是懶執行,它們并不立馬執行,而是先記錄對資料集的一系列transformation操作。在執行一個需要執行一個action操作時,會執行該資料集上所有的transformation操作,然後傳回結果。這種設計讓Spark的運算更加高效,例如,對一個資料集map操作之後使用reduce隻傳回結果,而不傳回龐大的map運算的結果集。

預設情況下,每個轉換的RDD在執行action操作時都會重新計算。即使兩個action操作會使用同一個轉換的RDD,該RDD也會重新計算。在這種情況下,可以使用​

​persist​

​方法或​

​cache​

​方法将RDD緩存到記憶體,這樣在下次使用這個RDD時将會提高計算效率。在這裡,也支援将RDD持久化到磁盤,或在多個節點上複制。

4.3.1 基礎(Basics)

參考下面的程式,了解RDD的基本輪廓:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)      
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);      

第一行通過讀取一個檔案建立了一個基本的RDD。這個資料集沒有加載到記憶體,也沒有進行其他的操作,變量lines僅僅是一個指向檔案的指針。第二行為transformation操作map的結果。此時lineLengths也沒有進行運算,因為map操作為懶執行。最後,執行action操作reduce。此時Spark将運算分隔成多個任務分發給多個機器,每個機器執行各自部分的map并進行本地reduce,最後傳回運作結果給驅動程式。

如果在後面的運算中仍會用到lineLengths,可以将其緩存,在reduce操作之前添加如下代碼,該persist操作将在lineLengths第一次被計算得到後将其緩存到記憶體:

lineLengths.persist()      
lineLengths.persist(StorageLevel.MEMORY_ONLY());      

4.3.2 把函數傳遞到Spark(Passing Functions to Spark)

  • ScalaSpark的API,在很大程度上依賴于把驅動程式中的函數傳遞到叢集上運作。這有兩種推薦的實作方式:
  • 使用​​匿名函數​​的文法,這可以讓代碼更加簡潔。
  • 使用全局單例對象的靜态方法。比如,你可以定義函數對象object MyFunctions,然後将該對象的​

    ​MyFunction.func1​

    ​方法傳遞給Spark,如下所示:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)      

注意:由于可能傳遞的是一個類執行個體方法的引用(而不是一個單例對象),在傳遞方法的時候,應該同時傳遞包含該方法的類對象。舉個例子:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}      

上面示例中,如果我們建立了一個類執行個體new MyClass,并且調用了執行個體的doStuff方法,該方法中的map操作調用了這個MyClass執行個體的func1方法,是以需要将整個對象傳遞到叢集中。類似于寫成:rdd.map(x=>this.func1(x))。

類似地,通路外部對象的字段時将引用整個對象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}      

等同于寫成rdd.map(x=>this.field+x),引用了整個this。為了避免這種問題,最簡單的方式是把field拷貝到本地變量,而不是去外部通路它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}      
  • JavaSpark的API,在很大程度上依賴于把驅動程式中的函數傳遞到叢集上運作。在Java中,函數由那些實作了org.apache.spark.api.java.function包中的接口的類表示。有兩種建立這樣的函數的方式:
  • 在你自己的類中實作Function接口,可以是匿名内部類,或者命名類,并且傳遞類的一個執行個體到Spark。
  • 在Java8中,使用lambda表達式來簡明地定義函數的實作。

為了保持簡潔性,本指南中大量使用了lambda文法,這在長格式中很容易使用所有相同的APIs。比如,我們可以把上面的代碼寫成:

JavaRDD<String>  lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new Function Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2 Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});      

同樣的功能,使用内聯式的實作顯得更為笨重繁瑣,代碼如下:

class GetLength implements Function Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2 Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());      

注意,java中的内部匿名類,隻要帶有final關鍵字,就可以通路類範圍内的變量。Spark也會把變量複制到每一個worker節點。

4.3.3 了解閉包(Understanding closures)

使用Spark的一個難點為:了解程式在叢集中執行時變量和方法的生命周期。RDD操作可以在變量範圍之外修改變量,這是一個經常導緻迷惑的地方。比如下面的例子,使用​

​foreach()​

​方法增加計數器(counter)的值(類似的情況,在其他的RDD操作中經常出現)。

4.3.3.1 示例(Example)

參考下面簡單的RDD元素求和示例,求和運算是否在同一個JVM中執行,其複雜度也不同。Spark可以在​

​local​

​模式下(​

​--master = local[n]​

​)執行應用,也可以将該Spark應用送出到叢集上執行(例如通過​

​spark-submit​

​送出到YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)      
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);      

上面是錯誤的,應該使用累加器,下面有說明。

4.3.3.2 本地模式 VS 叢集模式(Local vs. cluster modes)

在本地模式下僅有一個JVM,上面的代碼将直接計算RDD中元素和,并存儲到counter中。此時RDD和變量counter都在driver節點的同一記憶體空間中。

然而,在叢集模式下,情況會變得複雜,上面的代碼并不會按照預期的方式執行。為了執行這個job,Spark把處理RDD的操作分割成多個任務,每個任務将被一個executor處理。在執行之前,Spark首先計算閉包(closure)。閉包是必須對executor可見的變量和方法,在對RDD進行運算時将會用到這些變量和方法(在本例子中指foreach())。這個閉包會被序列化,并發送給每個executor。在local模式下,隻有一個executor,是以所有的變量和方法都使用同一個閉包。在其他模式下情況跟local模式不一樣,每個executor在不同的worker節點上運作,每個executor都有一個單獨的閉包。

在這裡,發送給每個executor的閉包内的變量是目前變量的副本,是以當counter在foreach中被引用時,已經不是在driver節點上的counter了。在driver節點的記憶體中仍然有一個counter,但這個counter對executors不可見。executor隻能操作序列化的閉包中的counter副本。是以,最終counter的值仍然是0,因為所有對counter的操作都是在序列化的閉包内的counter上進行的。

在類似這種場景下,為了保證良好的行為確定,應該使用累加器。Spark中的累加器專門為在叢集中多個節點間更新變量提供了一種安全機制。在本手冊的累加器部分将對累加器進行詳細介紹。

一般情況下,像環或本地定義方法這樣的閉包結構,不應該用于更改全局狀态。Spark不定義也不保證來自閉包外引用導緻的對象變化行為。有些情況下,在local模式下可以正常運作的代碼,在分布式模式下也許并不會像預期那樣執行。在分布式下運作時,建議使用累加器定義一些全局集合。

4.3.3.3 列印RDD的元素(Printing elements of an RDD)

列印一個RDD的元素也是一個常用的文法,帶引RDD元素可以使用方法​

​rdd.foreach(println)​

​或​

​rdd.map(println)​

​。在本地模式下,該方法将生成預期的輸出并列印RDD所有的元素。然而,在叢集模式下各個executor調用stdout,将結果列印到executor的stdout中。因為不是列印到driver節點上,是以在driver節點的stdout上不會看到這些輸出。如果想将RDD的元素列印到driver節點上,可以使用​

​collect()​

​方法将RDD發送到driver節點上,然後再列印該RDD:​

​rdd.collect().foreach(println)​

​。這個操作可能會導緻driver節點記憶體不足,因為​

​collect()​

​方法将RDD全部的資料都發送到一台節點上。如果僅僅列印RDD的部分元素,一個安全的方法是使用​

​take()​

​方法:​

​rdd.take(100).foreach(println)​

4.3.4 操作鍵值對(Working with Key­Value Pairs)

Spark大部分的RDD操作都是對任意類型的對象的,但是,有部分特殊的操作僅支援對鍵值對的RDD進行操作。最常用的是分布式“shuffle”操作,比如按照key将RDD的元素進行分組或聚集操作。

  • 在Scala中,包含Tuple2對象在内的RDD鍵值對操作,都是可以自動可用的(Tuple2對象是Scala語言内置的元組類型,可以通過簡單的編寫進行​

    ​(a,b)​

    ​建立)。鍵值對操作接口在​

    ​PairRDDFunctions​

    ​類中,該類中的接口自動使用RDD的元組。

    例如,在下面的代碼中使用​

    ​reduceByKey​

    ​操作對鍵值對進行計數,計算每行的文本出現的次數:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)      
  • 在Java中,鍵值對使用的是​

    ​scala.Tuple2​

    ​類。使用者可以使用特定的map操作将JavaRDDs轉換為JavaPairRDDs,例如​

    ​mapToPair​

    ​flatMapToPair​

    ​。JavaPairRDD擁有标準RDD和特殊鍵值對的方法。

    ​reduceByKey​

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);      

我們還可以使用​

​counts.sortByKey()​

​按照字母順序将鍵值對排序,使用​

​counts.collect()​

​将結果以一個數組的形式發送給driver節點。

注意,當在鍵值對操作中使用自定義對象作為key時,你必須保證自定義的​

​equals()​

​方法有一個對應的​

​hashCode()​

​方法。詳細的細節,請閱讀​​Object.hashCode() documentation​​。

4.3.5 Transformations

下面列出了Spark常用的transformation操作。詳細的細節請參考RDD API文檔(​​Scala​​、​​Java​​、​​Python​​、​​R​​)和鍵值對RDD方法文檔(​​Scala​​、​​Java​​)。

  • map(func)

    将原來RDD的每個資料項,使用map中使用者自定義的函數func進行映射,轉變為一個新的元素,并傳回一個新的RDD。

  • filter(func)

    使用函數func對原RDD中資料項進行過濾,将符合func中條件的資料項組成新的RDD傳回。

  • flatMap(func)

    類似于map,但是輸入資料項可以被映射到0個或多個輸出資料集合中,是以函數func的傳回值是一個資料項集合而不是一個單一的資料項。

  • mapPartitions(func)

    類似于map,但是該操作是在每個分區上分别執行,是以當操作一個類型為T的RDD時func的格式必須是

    Iterator<T> => Iterator<U>

    。即mapPartitions需要擷取到每個分區的疊代器,在函數中通過這個分區的疊代器對整個分區的元素進行操作。
  • mapPartitionsWithIndex(func)

    類似于mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,是以當處理T類型的RDD時,func的格式必須為

    (Int, Iterator<T>) => Iterator<U>

  • sample(withReplacement, fraction, seed)

    對資料采樣。使用者可以設定是否有放回(withReplacement)、采樣的百分比(fraction)、随機種子(seed)。

  • union(otherDataset)

    傳回原資料集和參數指定的資料集合并後的資料集。使用union函數時需要保證兩個RDD元素的資料類型相同,傳回的RDD資料類型和被合并的RDD元素資料類型相同。該操作不進行去重操作,傳回的結果會儲存所有元素。如果想去重,可以使用distinct()。

  • intersection(otherDataset)

    傳回兩個資料集的交集。

  • distinct([numTasks]))

    将RDD中的元素進行去重操作。

  • groupByKey([numTasks])

    操作(K,V)格式的資料集,傳回 (K, Iterable)格式的資料集。

    注意,如果分組是為了按key進行聚合操作(例如,計算sum、average),此時使用

    reduceByKey

    aggregateByKey

    計算效率會更高。

    注意,預設情況下,并行情況取決于父RDD的分區數,但可以通過參數

    numTasks

    來設定任務數。
  • reduceByKey(func, [numTasks])

    使用給定的func,将(K,V)對格式的資料集中key相同的值進行聚集,其中func的格式必須為(V,V) => V。可選參數numTasks可以指定reduce任務的數目。

  • aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])

    對(K,V)格式的資料按key進行聚合操作,聚合時使用給定的合并函數和一個初試值,傳回一個(K,U)對格式資料。需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;seqOp用于在每個分區中,相同的key中V類型的值合并到zeroValue建立的U類型的變量中。combOp是對重新分區後兩個分區中傳入的U類型資料的合并函數。

  • sortByKey([ascending], [numTasks])

    (K,V)格式的資料集,其中K已實作了Ordered,經過sortByKey操作傳回排序後的資料集。指定布爾值參數ascending來指定升序或降序排列。

  • join(otherDataset, [numTasks])

    用于操作兩個鍵值對格式的資料集,操作兩個資料集(K,V)和(K,W)傳回(K, (V, W))格式的資料集。通過

    leftOuterJoin

    rightOuterJoin

    fullOuterJoin

    完成外連接配接操作。
  • cogroup(otherDataset, [numTasks])

    用于操作兩個鍵值對格式資料集(K,V)和(K,W),傳回資料集格式為 (K,(Iterable, Iterable)) 。這個操作也稱為

    groupWith

    。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分别聚合為一個集合,并且傳回兩個RDD中對應Key的元素集合的疊代器。
  • cartesian(otherDataset)

    對類型為T和U的兩個資料集進行操作,傳回包含兩個資料集所有元素對的(T,U)格式的資料集。即對兩個RDD内的所有元素進行笛卡爾積操作。

  • pipe(command, [envVars])

    以管道(pipe)方式将 RDD的各個分區(partition)使用 shell指令處理(比如一個 Perl或 bash腳本)。 RDD的元素會被寫入程序的标準輸入(stdin),将程序傳回的一個字元串型 RDD(RDD of strings),以一行文本的形式寫入程序的标準輸出(stdout)中。

  • coalesce(numPartitions)

    把RDD的分區數降低到通過參數numPartitions指定的值。在得到的更大一些資料集上執行操作,會更加高效。

  • repartition(numPartitions)

    随機地對RDD的資料重新洗牌(Reshuffle),進而建立更多或更少的分區,以平衡資料。總是對網絡上的所有資料進行洗牌(shuffles)。

  • repartitionAndSortWithinPartitions(partitioner)

    根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序。這在每個分區中比先調用repartition再排序效率更高,因為它可以将排序過程在shuffle操作的機器上進行。

4.3.6 Actions

下面列出了Spark支援的常用的action操作。詳細請參考RDD API文檔(​​Scala​​、​​Java​​、​​Python​​、​​R​​)和鍵值對RDD方法文檔(​​Scala​​、​​Java​​)。

  • reduce(func)

    使用函數func聚集資料集中的元素,這個函數func輸入為兩個元素,傳回為一個元素。這個函數應該符合結合律和交換了,這樣才能保證資料集中各個元素計算的正确性。

  • collect()

    在驅動程式中,以數組的形式傳回資料集的所有元素。通常用于filter或其它産生了大量小資料集的情況。

  • count()

    傳回資料集中元素的個數。

  • first()

    傳回資料集中的第一個元素(類似于

    take(1)

    )。
  • take(n)

    傳回資料集中的前n個元素。

  • takeSample(withReplacement,num, [seed])

    對一個資料集随機抽樣,傳回一個包含num個随機抽樣元素的數組,參數

    withReplacement

    指定是否有放回抽樣,參數

    seed

    指定生成随機數的種子。
  • takeOrdered(n, [ordering])

    傳回RDD按自然順序或自定義順序排序後的前n個元素。

  • saveAsTextFile(path)

    将資料集中的元素以文本檔案(或文本檔案集合)的形式儲存到指定的本地檔案系統、HDFS或其它Hadoop支援的檔案系統中。Spark将在每個元素上調用

    toString

    方法,将資料元素轉換為文本檔案中的一行記錄。
  • saveAsSequenceFile(path) (Java and Scala)

    将資料集中的元素以Hadoop Sequence檔案的形式儲存到指定的本地檔案系統、HDFS或其它Hadoop支援的檔案系統中。該操作隻支援對實作了Hadoop的

    Writable

    接口的鍵值對RDD進行操作。在Scala中,還支援隐式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)。
  • saveAsObjectFile(path) (Java and Scala)

    将資料集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些儲存該資料的檔案,可以使用SparkContext.objectFile()進行加載。

  • countByKey()

    僅支援對(K,V)格式的鍵值對類型的RDD進行操作。傳回(K,Int)格式的Hashmap,(K,Int)為每個key值對應的記錄數目。

  • foreach(func)

    對資料集中每個元素使用函數func進行處理。該操作通常用于更新一個累加器(Accumulator)或與外部資料源進行互動。注意:在foreach()之外修改累加器變量可能引起不确定的後果。詳細介紹請閱讀Understanding closures部分。

4.3.7 Shuffle操作(Shuffle operations)

Spark内的一個操作将會觸發shuffle事件。shuffle是Spark将多個分區的資料重新分組重新分布資料的機制。shuffle是一個複雜且代價較高的操作,它需要完成将資料在executor和機器節點之間進行複制的工作。

4.3.7.1 背景(Background)

通過​

​reduceByKey​

​操作的例子,來了解shuffle過程。​

​reduceByKey​

​操作生成了一個新的RDD,原始資料中相同key的所有記錄的聚合值合并為一個元組,這個元組中的key對應的值為執行reduce函數之後的結果。這個操作的挑戰是,key相同的所有記錄不在同一個分區中,甚至不在同一台機器上,但是該操作必須将這些記錄聯合運算。

在Spark中,通常一條資料不會跨分區分布,除非為了一個特殊的操作在必要的地方才會跨分區分布。在計算過程中,一個分區由一個task進行處理。是以,為了組織所有的資料讓一個reduceByKey任務執行,Spark需要進行一個all-to-all操作。all-to-all操作需要讀取所有分區上的資料的所有的key,以及key對應的所有的值,然後将多個分區上的資料進行彙總,并将每個key對應的多個分區的資料進行計算得出最終的結果,這個過程稱為shuffle。

雖然每個分區中新shuffle後的資料元素是确定的,分區間的順序也是确定的,但是所有的元素是無序的。如果想在shuffle操作後将資料按指定規則進行排序,可以使用下面的方法:

  • 使用​

    ​mapPartitions​

    ​操作在每個分區上進行排序,排序可以使用​

    ​.sorted​

    ​等方法。
  • ​repartitionAndSortWithinPartitions​

    ​操作在重新分區的同時高效的對分區進行排序。
  • ​sortBy​

    ​将RDD進行排序。

會引起shuffle過程的操作有:

  • ​repartition​

    ​操作,例如:​

    ​repartition​

    ​coalesce​

  • ​ByKey​

    ​操作(除了counting相關操作),例如:​

    ​groupByKey​

    ​reduceByKey​

  • ​join​

    ​cogroup​

    ​join​

4.3.7.2 性能影響(Performance Impact)

shuffle是一個代價比較高的操作,它涉及磁盤IO、資料序列化、網絡IO。為了準備shuffle操作的資料,Spark啟動了一系列的map任務和reduce任務,map任務完成資料的處理工作,reduce完成map任務處理後的資料的收集工作。這裡的map、reduce來自MapReduce,跟Spark的​

​map​

​操作和​

​reduce​

​操作沒有關系。

在内部,一個map任務的所有結果資料會儲存在記憶體,直到記憶體不能全部存儲為止。然後,這些資料将基于目标分區進行排序并寫入一個單獨的檔案中。在reduce時,任務将讀取相關的已排序的資料塊。

某些shuffle操作會大量消耗堆記憶體空間,因為shuffle操作在資料轉換前後,需要在使用記憶體中的資料結構對資料進行組織。需要特别說明的是,​

​reduceByKey​

​aggregateByKey​

​在map時會建立這些資料結構,​

​ByKey​

​操作在reduce時建立這些資料結構。當記憶體滿的時候,Spark會把溢出的資料存到磁盤上,這将導緻額外的磁盤IO開銷和垃圾回收開銷的增加。

shuffle操作還會在磁盤上生成大量的中間檔案。在Spark 1.3中,這些檔案将會保留至對應的RDD不在使用并被垃圾回收為止。這麼做的好處是,如果在Spark重新計算RDD的血統關系(lineage)時,shuffle操作産生的這些中間檔案不需要重新建立。如果Spark應用長期保持對RDD的引用,或者垃圾回收不頻繁,這将導緻垃圾回收的周期比較長。這意味着,長期運作Spark任務可能會消耗大量的磁盤空間。臨時資料存儲路徑可以通過SparkContext中設定參數​

​spark.local.dir​

​進行配置。

shuffle操作的行為可以通過調節多個參數進行設定。詳細的說明請看​​Configuration Guide​​中的“Shuffle Behavior”部分。

4.4 RDD持久化(RDD Persistence)

Spark中一個很重要的能力是将資料持久化(或稱為緩存),在多個操作間都可以通路這些持久化的資料。當持久化一個RDD時,每個節點會将本節點計算的資料塊存儲到記憶體,在該資料上的其他action操作将直接使用記憶體中的資料。這樣會讓以後的action操作計算速度加快(通常運作速度會加速10倍)。緩存是疊代算法和快速的互動式使用的重要工具。

RDD可以使用​

​persist()​

​cache()​

​方法進行持久化。資料将會在第一次action操作時進行計算,并在各個節點的記憶體中緩存。Spark的緩存具有容錯機制,如果一個緩存的RDD的某個分區丢失了,Spark将按照原來的計算過程,自動重新計算并進行緩存。

另外,每個持久化的RDD可以使用不同的存儲級别進行緩存,例如,持久化到磁盤、已序列化的Java對象形式持久化到記憶體(可以節省空間)、跨節點間複制、以off-heap的方式存儲在 Tachyon。這些存儲級别通過傳遞一個​

​StorageLevel​

​對象(​​Scala​​、​​Java​​、​​Python​​)給​

​persist()​

​方法進行設定。​

​cache()​

​方法是使用預設存儲級别的快捷設定方法,預設的存儲級别是​

​StorageLevel.MEMORY_ONLY​

​(将反序列化的對象存儲到記憶體中)。詳細的存儲級别介紹如下:

  • MEMORY_ONLY:将RDD以反序列化Java對象的形式存儲在JVM中。如果記憶體空間不夠,部分資料分區将不再緩存,在每次需要用到這些資料時重新進行計算。這是預設的級别。
  • MEMORY_AND_DISK:将RDD以反序列化Java對象的形式存儲在JVM中。如果記憶體空間不夠,将未緩存的資料分區存儲到磁盤,在需要使用這些分區時從磁盤讀取。
  • MEMORY_ONLY_SER:将RDD以序列化的Java對象的形式進行存儲(每個分區為一個byte數組)。這種方式會比反序列化對象的方式節省很多空間,尤其是在使用​​fast serializer​​時會節省更多的空間,但是在讀取時會增加CPU的計算負擔。
  • MEMORY_AND_DISK_SER:類似于MEMORY_ONLY_SER,但是溢出的分區會存儲到磁盤,而不是在用到它們時重新計算。
  • DISK_ONLY:隻在磁盤上緩存RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:與上面的級别功能相同,隻不過每個分區在叢集中兩個節點上建立副本。
  • OFF_HEAP (實驗中):以序列化的格式 (serialized format) 将 RDD存儲到 ​​Tachyon​​。相比于MEMORY_ONLY_SER, OFF_HEAP 降低了垃圾收集(garbage collection)的開銷,使得 executors變得更小,而且共享了記憶體池,在使用大堆(heaps)和多應用并行的環境下有更好的表現。此外,由于 RDD存儲在Tachyon中, executor的崩潰不會導緻記憶體中緩存資料的丢失。在這種模式下, Tachyon中的記憶體是可丢棄的。是以,Tachyon不會嘗試重建一個在記憶體中被清除的分塊。如果你打算使用Tachyon進行off heap級别的緩存,Spark與Tachyon目前可用的版本相相容。詳細的版本配對使用建議請參考​​Tachyon的說明​​。

注意,在Python中,緩存的對象總是使用​​Pickle​​進行序列化,是以在Python中不關心你選擇的是哪一種序列化級别。

在shuffle操作中(例如​

​reduceByKey​

​),即便是使用者沒有調用​

​persist​

​方法,Spark也會自動緩存部分中間資料。這麼做的目的是,在shuffle的過程中某個節點運作失敗時,不需要重新計算所有的輸入資料。如果使用者想多次使用某個RDD,強烈推薦在該RDD上調用​

​persist​

​方法。

4.4.1 如何選擇存儲級别(Which Storage Level to Choose?)

Spark的存儲級别的選擇,核心問題是在記憶體使用率和CPU效率之間進行權衡。建議按下面的過程進行存儲級别的選擇:

  • 如果使用預設的存儲級别(MEMORY_ONLY),存儲在記憶體中的RDD沒有發生溢出,那麼就選擇預設的存儲級别。預設存儲級别可以最大程度的提高CPU的效率,可以使在RDD上的操作以最快的速度運作。
  • 如果記憶體不能全部存儲RDD,那麼使用MEMORY_ONLY_SER,并​​挑選一個快速序列化庫​​将對象序列化,以節省記憶體空間。使用這種存儲級别,計算速度仍然很快。
  • 除了在計算該資料集的代價特别高,或者在需要過濾大量資料的情況下,盡量不要将溢出的資料存儲到磁盤。因為,重新計算這個資料分區的耗時與從磁盤讀取這些資料的耗時差不多。
  • 如果想快速還原故障,建議使用多副本存儲界别(例如,使用Spark作為web應用的背景服務,在服務出故障時需要快速恢複的場景下)。所有的存儲級别都通過重新計算丢失的資料的方式,提供了完全容錯機制。但是多副本級别在發生資料丢失時,不需要重新計算對應的資料庫,可以讓任務繼續運作。
  • 在高記憶體消耗或者多任務的環境下,還處于實驗性的OFF_HEAP模式有下列幾個優勢:
  • 它支援多個executor使用Tachyon中的同一個記憶體池。
  • 它顯著減少了記憶體回收的代價。
  • 如果個别executor崩潰掉,緩存的資料不會丢失。

4.4.2 移除資料(Removing Data)

Spark自動監控各個節點上的緩存使用率,并以最近最少使用的方式(LRU)将舊資料塊移除記憶體。如果想手動移除一個RDD,而不是等待該RDD被Spark自動移除,可以使用​

​RDD.unpersist()​

5 共享變量(Shared Variables)

通常情況下,一個傳遞給Spark操作(例如​

​map​

​reduce​

​)的方法是在遠端叢集上的節點執行的。方法在多個節點執行過程中使用的變量,是同一份變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,各個遠端機器上變量的更新并不會傳回driver程式。然而,為了滿足兩種常見的使用場景,Spark提供了兩種特定類型的共享變量:廣播變量(broadcast variables)和累加器(accumulators)。

5.1 廣播變量(broadcast variables)

廣播變量允許程式設計者将一個隻讀變量緩存到每台機器上,而不是給每個任務傳遞一個副本。例如,廣播變量可以用一種高效的方式給每個節點傳遞一份比較大的資料集副本。在使用廣播變量時,Spark也嘗試使用高效廣播算法分發變量,以降低通信成本。

Spark的action操作是通過一些列的階段(stage)進行執行的,這些階段(stage)是通過分布式的shuffle操作進行切分的。Spark自動廣播在每個階段内任務需要的公共資料。這種情況下廣播的資料使用序列化的形式進行緩存,并在每個任務在運作前進行反序列化。這明确說明了,隻有在跨越多個階段的多個任務任務會使用相同的資料,或者在使用反序列化形式的資料特别重要的情況下,使用廣播變量會有比較好的效果。

廣播變量通過在一個變量​

​v​

​上調用​

​SparkContext.broadcast(v)​

​方法進行建立。廣播變量是​

​v​

​的一個封裝器,可以通過​

​value​

​方法通路​

​v​

​的值。代碼示例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)      
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]      

廣播變量建立之後,在叢集上執行的所有的函數中,應該使用該廣播變量代替原來的v值。是以,每個節點上的v最多分發一次。另外,對象v在廣播後不應該再被修改,以保證分發到所有的節點上的廣播變量有同樣的值(例如,在分發廣播變量之後,又對廣播變量進行了修改,然後又需要将廣播變量分發到新的節點)。

5.2 累加器(Accumulators)

累加器隻允許關聯操作進行"added"操作,是以在并行計算中可以支援特定的計算。累加器可以用于實作計數(類似在MapReduce中那樣)或者求和。原生Spark支援數值型的累加器,程式設計者可以添加新的支援類型。建立累加器并命名之後,在Spark的UI界面上将會顯示該累加器。這樣可以幫助了解正在運作的階段的運作情況(注意,在Python中還不支援)。

一個累加器可以通過在原始值v上調用​

​SparkContext.accumulator(v)​

​。然後,叢集上正在運作的任務就可以使用​

​add​

​+=​

​操作對該累加器進行累加操作。隻有driver程式可以讀取累加器的值,讀取累加器的值使用​

​value​

下面代碼将數組中的元素進行求和:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10      
Accumulator<Integer> accum = sc.accumulator(0);

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10      

上面的代碼示例使用的是Spark内置的Int類型的累加器,開發者可以通過內建​​AccumulatorParam​​類建立新的累加器類型。AccumulatorParam接口有兩個方法:​

​zero​

​方法和​

​addInPlace​

​方法。​

​zero​

​方法給資料類型提供了一個0值,​

​addInPlace​

​方法能夠将兩個值進行累加。例如,假設我們有一個表示數學上向量的​

​Vector​

​類,我們可以寫成:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)      
class VectorAccumulatorParam implements AccumulatorParam<Vector> {
  public Vector zero(Vector initialValue) {
    return Vector.zeros(initialValue.size());
  }
  public Vector addInPlace(Vector v1, Vector v2) {
    v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());      

Spark也支援使用更通用的 Accumulable接口去累加資料,其結果資料的類型和累加的元素類型不同(例如,通過收集資料元素建立一個list)。在Scala中,​

​SparkContext.accumulableCollection​

​方法可用于累加常用的Scala集合類型。

累加器的更新隻發生在action操作中,Spark保證每個任務隻能更新累加器一次,例如重新啟動一個任務,該重新開機的任務不允許更新累加器的值。在transformation使用者需要注意的是,如果任務過job的階段重新執行,每個任務的更新操作将會執行多次。

累加器沒有改變Spark懶執行的模式。如果累加器在RDD中的一個操作中進行更新,該累加器的值隻在該RDD進行action操作時進行更新。是以,在一個像​

​map()​

​這樣的轉換操作中,累加器的更新并沒有執行。下面的代碼片段證明了這個特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.      
Accumulator<Integer> accum = sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.      

6 将應用送出到叢集(Deploying to a Cluster)

​​應用送出手冊​​描述了如何将應用送出到叢集。簡單的說,當你将你的應用打包成一個JAR(Java/Scala)或者一組​

​.py​

​.zip​

​檔案(Python)後,就可以通過​

​bin/spark-submit​

​腳本将腳本送出到叢集支援的管理器中。

 更多關于spark-submit見《​​spark送出模式​​》

7 Java/Scala中啟動Spark作業(Launching Spark jobs from Java / Scala)

使用​​org.apache.spark.launcher​​包提供的簡單的Java API,可以将Spark作業以該包中提供的類的子類的形式啟動。

8 單元測試(Unit Testing)

Spark可以友好的使用流行的單元測試架構進行單元測試。在test中簡單的建立一個​

​SparkContext​

​,master的URL設定為​

​local​

​,運作幾個操作,然後調用​

​SparkContext.stop()​

​将該作業停止。因為Spark不支援在同一個程式中運作兩個context,是以需要請確定使用​

​finally​

​塊或者測試架構的​

​tearDown​

​方法将context停止。

9 從Spark1.0之前的版本遷移(Migrating from pre­1.0 Versions of Spark)

Spark 1.0當機了1.X系列的Spark核的API,是以,目前沒有标記為"experimental"或者“developer API”的API都将在未來的版本中進行支援。

  • Scala的變化

對于Scala的變化是,分組操作(例如​

​groupByKey​

​cogroup​

​join​

​)的傳回類型由​

​(Key,Seq[Value])​

​變為​

​(Key,Iterable[Value])​

  • Java API的變化
  • 1.0中​

    ​org.apache.spark.api.java.function​

    ​類中的​

    ​Function​

    ​類變成了接口,這意味着舊的代碼中​

    ​extends Function​

    ​應該改為​

    ​implement Function​

  • 增加了新的​

    ​map​

    ​型操作,例如​

    ​mapToPair​

    ​mapToDouble​

    ​,增加的這些操作可用于建立特殊類型的RDD。
  • 分組操作(例如​

    ​groupByKey​

    ​cogroup​

    ​join​

    ​(Key,Seq[Value])​

    ​(Key,Iterable[Value])​

這些遷移指導對Spark Streaming、MLlib和GraphX同樣有效。

10 下一步(Where to Go from Here)

你可以在Spark網站看一些​​Spark程式設計示例​​。另外,Spark在​

​examples​

​目錄下包含了許多例子(​​Scala​​、​​Java​​、​​Python​​、​​R​​)。運作Java和Scala例子,可以通過将例子的類名傳給Spark的​

​bin/run-example​

​腳本進行啟動。例如:

./bin/run-example SparkPi      

Python示例,使用​

​spark-submit​

​指令送出:

./bin/spark-submit examples/src/main/python/pi.py      

R示例,使用​

​spark-submit​

./bin/spark-submit examples/src/main/r/dataframe.R