天天看點

Spark 2.2.1 官方文檔翻譯 RDD程式設計指南(RDD Programming Guide)概覽通過編寫應用程式使用Spark通過Spark-Shell使用Spark彈性分布式資料集(RDD)共享變量單元測試

http://spark.apache.org/docs/latest/rdd-programming-guide.html
  • 概覽
  • 通過編寫應用程式使用Spark
    • 連結到Spark
    • 初始化Spark
  • 通過Spark-Shell使用Spark
  • 彈性分布式資料集RDD
    • parallelizing 集合
    • 外部資料集
    • RDD操作
      • 基本操作
      • 将函數傳遞給Spark
      • 了解閉包
        • 例子
        • 本地或叢集模式
        • 列印RDD的元素
      • 使用Key-Value對
      • Transformations
      • Actions
      • Shuffle 操作
        • 背景
        • 對性能的影響
    • RDD持久化
      • 選擇哪個存儲級别
      • 移除資料
  • 共享變量
    • 廣播變量
    • 累加器
  • 單元測試

概覽

在上層運作中,每個Spark應用程式都包含一個Driver程式,該程式運作使用者的main函數,并在叢集上執行各種并行操作。

Spark提供的主要抽象是一個彈性分布式資料集(RDD),一個可以在群集節點上并行操作的離散分布的資料集。

RDD的來源有3種:

1. 檔案

2. 集合(一種資料結構)

3. 其他RDD轉換得來

使用者也可以要求Spark将RDD儲存在記憶體中,以便在并行操作中有效地重用它。最後,RDD可以自動從節點故障中恢複。

Spark中的第二個抽象是可用于并行操作的共享變量。預設情況下,Spark在不同節點上并行執行一組任務時,會将該函數中使用的每個變量的副本傳送給每個任務。有時候,變量需要在任務之間,或任務與驅動程式之間共享。

Spark支援兩種類型的共享變量:廣播變量,可用于在所有節點上緩存記憶體中的值,以及累加器,常用于計算次數或某些數值的和。

本指南顯示了Spark支援的各種語言中的每個功能。使用Spark-Shell可以更深刻了解本指南的内容。

通過編寫應用程式使用Spark

連結到Spark

Spark 預設使用Scala 2.11,要在Scala中編寫應用程式,您需要使用相容的Scala版本(例如2.11.X)。

要編寫Spark應用程式,您需要在Spark上添加Maven依賴項。 Spark可以通過Maven Central獲得:

groupId = org.apache.spark
artifactId = spark-core_2.
version = .
           

另外,如果你想通路一個HDFS叢集,你需要為你的HDFS版本添加對hadoop-client的依賴。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
           

最後,您需要将一些Spark類導入到您的程式中。添加以下行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
           
在Spark 1.3.0之前,您需要導入org.apache.spark.SparkContext._以啟用基本的隐式轉換

初始化Spark

Spark程式必須做的第一件事就是建立一個SparkContext對象,它告訴Spark如何通路一個叢集。要建立一個SparkContext,首先需要建構一個包含有關應用程式資訊的SparkConf對象。

每個JVM隻能有一個SparkContext處于活動狀态。在建立一個新的SparkContext之前,必須先停止()活動的SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
           

appName參數是您的應用程式在叢集UI上顯示的名稱。master是Spark,Mesos或YARN群集URL,或者是以本地模式運作的特殊“本地”字元串。實際上,在群集上運作時,您不希望在程式中寫死master,而是使用spark-submit啟動應用程式,并在那裡接收它。但是,對于本地測試和單元測試,可以設定為“local”來運作程序中的Spark。

通過Spark-Shell使用Spark

在Spark shell中已經為您建立了一個SparkContext,名為sc。制作自己的SparkContext将不起作用。您可以使用–master參數來設定上下文所連接配接的host,并且可以通過将逗号分隔清單傳遞給–jars參數來将JAR添加到類路徑中。您還可以通過向–packages參數提供逗号分隔的Maven坐标清單,将相關依賴(例如Spark包)添加到shell會話中。

例如,要在四個核心上運作bin / spark-shell,請使用:

或者,也可以将code.jar添加到其類路徑中,請使用:

要添加Maven依賴項:

運作spark-shell –help 可以看到有關選項的完整清單。在底層,spark-shell調用spark-submit腳本。

彈性分布式資料集(RDD)

Spark的工作圍繞彈性分布式資料集(RDD)的概念展開,RDD是可以并行操作的容錯元素集合。

有兩種方法可以建立RDD:

  • parallelizing Driver中的現有集合

parallelizing 集合

parallelizing 集合是通過調用驅動程式上的SparkContext的parallelize方法來建立的。集合的元素被複制以形成可以并行操作的分布式資料集。

例如,下面是如何建立一個包含數字1到5的并行化集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
           

一旦建立,分布式資料集(distData)可以進行并行操作。例如,我們可以調用distData.reduce((a,b)=> a + b)來合計數組的元素。我們稍後介紹更多分布式資料集上的操作。

并行集合的一個重要參數是要将資料集剪切成的分區數量。Spark将為群集的每個分區運作一個Task。通常情況下,您需要為群集中的每個CPU配置設定2-4個分區。通常情況下,Spark會嘗試根據您的群集自動設定分區數量。但是,您也可以通過設定parallelize的第二個參數(例如sc.parallelize(data,10))進行手動設定。

外部資料集

Spark可以從Hadoop支援的任何存儲源(包括本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等)建立分布式資料集.Spark支援文本檔案,SequenceFile和任何其他Hadoop InputFormat。

文本檔案的RDD可以使用SparkContext的textFile方法建立。這個方法接受一個檔案的URI(機器上的一個本地路徑,或者一個hdfs://,s3n://等URI),并把它作為一個行集合來讀取。

這是一個示例調用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[] at textFile at <console>:
           

一旦建立,distFile可以通過資料集操作進行操作。例如,我們可以使用map來獲得所有行的大小,并進行Reduce操作獲得總大小,如下所示:distFile.map(s => s.length).reduce((a,b)=> a + b)。

使用Spark讀取檔案的一些注意事項:

  • 如果在本地檔案系統上使用路徑,則該檔案也必須可以在Worker上的相同路徑上通路。通過将檔案複制到所有工作節點或使用網絡共享檔案系統。
  • Spark的所有基于檔案的輸入方法(包括textFile)都支援在目錄,壓縮檔案和通配符上運作。例如,您可以使用textFile(“/ my / directory”),textFile(“/ my / directory / 。txt”)和textFile(“/ my / directory / 。gz”)。
  • textFile方法還使用可選的第二個參數來控制檔案的分區數量。
  • 預設情況下,Spark為檔案的每個塊建立一個分區(HDFS中的塊預設為128MB),但是您也可以通過傳遞更大的值來請求更多的分區。請注意,您不能有比塊更少的分區。

除了文本檔案外,Spark的Scala API還支援其他幾種資料格式:

  • SparkContext.wholeTextFiles讓你讀取一個包含多個小文本檔案的目錄,并将它們作為(檔案名,内容)對傳回。這與textFile相反,textFile将在每個檔案中每行傳回一個記錄。分區由資料局部性決定,在某些情況下可能導緻分區太少。對于這些情況,wholeTextFiles提供了一個可選的第二個參數來控制分區的最小數量。
  • 對于SequenceFiles,使用SparkContext的sequenceFile [K,V]方法,其中K和V是檔案中的鍵和值的類型。這些應該是Hadoop的Writable接口的子類,如IntWritable和Text。另外,Spark允許您為幾個常見Writable指定類型;例如,sequenceFile [Int,String]将自動讀取IntWritables和Texts。
  • 對于其他Hadoop InputFormats,可以使用SparkContext.hadoopRDD方法,該方法采用任意的JobConf和輸入格式類,key類和value類。将它們設定為您使用輸入源進行Hadoop作業的方式相同。您還可以使用SparkContext.newAPIHadoopRDD for InputFormats基于“新”MapReduce API。
  • RDD.saveAsObjectFile和SparkContext.objectFile支援以包含序列化Java對象的簡單格式儲存RDD。雖然這不像Avro這樣的專業格式,但它提供了一種簡單的方法來儲存任何RDD。

RDD操作

RDD支援兩種類型的操作:transformations(從現有資料集建立新資料集)和actions(在資料集上運作計算後将值傳回給驅動程式)。

例如,map是一個通過函數傳遞每個資料集元素的transformation,并傳回一個代表結果的新RDD。另一方面,reduce是一個Action,它使用某個函數聚合RDD的所有元素,并将最終結果傳回給驅動程式(還有一個并行reduceByKey函數傳回一個分布式資料集)。

Spark中的所有transformations都是懶加載的,它們不會馬上計算結果。他們隻記住應用于某些基礎資料集(例如檔案)的Transformation。隻有在Action需要将結果傳回給驅動程式時才會執行計算。這種設計使Spark能夠更高效地運作。例如,我們可以認識到通過map建立的資料集将被用于reduce,并且隻将reduce的結果傳回給驅動程式,而不是傳回更大的資料集。

預設情況下,每次對其執行操作時,每個已轉換的RDD都可能重新計算。但是,您也可以使用持久化(或緩存)方法将RDD保留在記憶體中,在這種情況下,Spark将該RDD,以便在下次查詢時快速通路。還支援在磁盤上持久化RDD,或在多個節點上複制RDD。

基本操作

為了說明RDD基礎知識,請考慮下面的簡單程式:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
           

第一行定義了來自外部檔案的基本RDD。這個資料集不會被加載到記憶體中,或者作用于其他行上:僅僅是一個指向檔案的指針。

第二行将lineLengths是對lines做一個Map轉換的結果,得到每一行的長度。同樣,lineLengths由于懶惰而沒有立即計算。

最後,我們運作reduce,這是一個Action。在這個時候,Spark将計算分解為在不同機器上運作的任務,每台機器既運作其Map部分任務又運作局部reduce任務,隻傳回運作結果給Driver。

如果我們還想稍後再使用lineLength,我們可以對其進行儲存:

lineLengths.persist()
           

在Reduce前,lineLengths RDD将會被儲存在記憶體中。

将函數傳遞給Spark

Spark的API在很大程度上需要将Driver中的函數傳遞到叢集上運作。有兩種建議的方法來做到這一點:

  • 匿名函數,可用于短小的代碼。
  • 全局單例對象中的靜态方法。例如,您可以在定義MyFunctions Object,然後傳遞MyFunctions.func1給Spark的Api,如下所示:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
           

請注意,雖然也可以在類執行個體中傳遞對方法的引用(與單例對象相反),但這需要将包含該類的對象與方法一起發送。比如說:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
           

在這裡,如果我們建立一個新的MyClass執行個體,并在其上調用doStuff方法,那麼其中的映射會引用該MyClass執行個體的func1方法,是以需要将整個MyClass 對象發送到叢集。

以類似的方式,通路外部對象的字段将引用整個對象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
           

相當于寫了rdd.map(x => this.field + x),它引用了this。為了避免這個問題,最簡單的方法是将字段複制到本地變量中,而不是從外部通路:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
           

了解閉包

Spark的難點之一是在叢集中執行代碼時了解變量和方法的範圍和生命周期。

修改範圍之外的變量的RDD操作常常會給人們帶來混亂。在下面的例子中,我們将看看使用foreach()來增加計數器的代碼,裡面出現的問題在其他操作也會出現。

例子

考慮下面的RDD元素計算總和的操作,其執行是否發生在同一個JVM中,這可能會有不同的表現。一個常見的例子就是在本地模式下運作Spark(–master = local [n])與将Spark應用程式部署到叢集(例如,通過spark-submit to YARN):

var counter = 
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
           

本地或叢集模式

上面的代碼的行為是未定義的,并可能無法正常工作。為了執行作業,Spark将RDD操作的處理分解為多個任務,每個任務由Executor執行。在執行之前,Spark計算任務的閉包。閉包是Executor在RDD上執行計算(在本例中為foreach()))時必須可見的那些變量和方法。這個計算後的閉包将會序列化并發送給每個執行者。

發送給每個執行程式的閉包中的變量現在是一個副本,是以,當在foreach函數中引用counter 時,它不再是驅動程式節點上的counter 。驅動程式節點的記憶體中還有一個counter ,但Executor引用不到它!Executor隻能看到序列化閉包的副本。是以,counter 的最終值仍然是零。

在本地模式下,在某些情況下,foreach函數實際上将在與驅動程式相同的JVM内執行,并将引用相同的原始計數器,并可能實際更新它。

為了確定在這種情況下明确的行為,應該使用Accumulators 。Spark中的Accumulators 專門用于提供一種在叢集中的工作節點之間執行拆分時安全地更新變量的機制。本指南的“Accumulators ”部分更詳細地讨論了這些内容。

一般來說,閉包 - 像循環或本地定義的方法這樣的構造不應該被用來改變一些全局狀态。Spark并沒有定義或保證對從閉包外引用的對象的突變行為。這樣做的一些代碼可能在本地模式下工作,但這是偶然的,這樣的代碼不會按預期在分布式模式下運作。如果需要全局聚合,請使用Accumulators 。

列印RDD的元素

另一個常見的習慣是試圖使用rdd.foreach(println)或rdd.map(println)列印RDD的元素。在單台機器上,這将生成預期的輸出并列印所有RDD的元素。但是,在叢集模式下,執行程式調用的stdout輸出現在寫入執行程式的stdout,而不是驅動程式的stdout,是以驅動程式上的stdout不會顯示這些!要列印驅動程式中的所有元素,可以使用collect()方法首先将RDD帶到驅動程式節點:rdd.collect().foreach(println)。但是,這可能會導緻驅動程式記憶體不足,因為collect()會将整個RDD提取到Driver上;您隻需要列印RDD的幾個元素,一個更安全的方法是使用take():rdd.take(100).foreach(println)。

使用Key-Value對

盡管大多數Spark操作在包含任何類型對象的RDD上工作,但是一些特殊操作僅在鍵 - 值對的RDD上可用。

最常見的是分布式的“shuffle”操作,如按key分組或聚合元素。

在Scala中,這些操作可以在包含Tuple2對象的RDD中自動使用(通過簡單寫入(a,b)建立的語言中的内置元組)。PairRDDFunctions類中提供了鍵值對操作,該類自動包裝元組的RDD。

例如,以下代碼使用鍵值對上的reduceByKey操作來計算文本中每行文本的出現次數:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, ))
val counts = pairs.reduceByKey((a, b) => a + b)
           

或者,我們也可以使用counts.sortByKey()來按字母順序對這些對進行排序,最後count.collect()将它們作為一個對象數組傳回給驅動程式。

注意:在使用自定義對象作為鍵值對操作中的鍵時,必須確定自定義equals()方法附帶有比對的hashCode()方法。有關完整的詳細資訊,請參閱Object.hashCode()文檔中的概述。

Transformations

下表列出了Spark支援的一些常見Transformations。有關詳細資訊,請參閱RDD API文檔(Scala)和RDD函數doc(Scala)。

Transformation 描述
map(func) 通過函數func傳遞源RDD的每個元素來形成一個新的RDD。
filter(func) 通過選擇func傳回true的源的元素傳回一個新的RDD
flatMap(func) 類似于map,但是每個輸入項可以映射到0個或更多個輸出項(是以func應該傳回一個Seq而不是單個項)。
mapPartitions(func) 與map類似,但是在RDD的每個分區(塊)上分别運作,是以當在T型RDD上運作時,func參數必須是Iterator => Iterator 類型。
mapPartitionsWithIndex(func) 類似于mapPartitions,但也提供了一個表示分區索引的整數值的func,是以在T類型的RDD上運作時,func的參數類型必須是(Int,Iterator )=> Iterator 。
sample(withReplacement, fraction, seed) 使用給定的随機數種子對資料的一小部分進行采樣,有或沒有替換。
union(otherDataset) 并集,傳回包含源資料集中的元素和參數的資料集的新資料集。
intersection(otherDataset) 交集,傳回一個新的RDD,其中包含源資料集中的元素和參數的交集。
distinct([numTasks])) 傳回包含源資料集的不同元素的新資料集,類似于Set資料結構
groupByKey([numTasks]) 當在(K,V)對的資料集上調用時,傳回(K,Iterable )對的資料集。注意:如果您正在對每個鍵執行聚合(例如總和或平均),則使用reduceByKey或aggregateByKey将會産生更好的性能。注:預設情況下,輸出中的并行級别取決于父RDD的分區數量。您可以傳遞一個可選的numTasks參數來設定不同數量的任務。
reduceByKey(func, [numTasks]) 當調用(K,V)對的資料集時,傳回(K,V)對的資料集,其中每個鍵的值使用給定的reduce函數func進行聚合,函數func必須是(V,V)=> V.就像在groupByKey中一樣,reduce任務的數量可以通過可選的第二個參數來配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 當在(K,V)對的資料集上時,傳回(K,U)對的資料集,其中使用給定的組合函數和中性的“零”值來彙總每個鍵的值。允許與輸入值類型不同的聚合值類型,同時避免不必要的配置設定。reduce任務的數量可以通過可選的第二個參數來配置。
sortByKey([ascending],[numTasks]) 當在K實作Ordered的(K,V)對的資料集上調用時,按照ascending參數的指定,按照升序或降序傳回按key排序的(K,V)對的資料集。
join(otherDataset, [numTasks]) 當(K,V)和(K,W)類型的資料集被調用時,傳回每個key的所有元素對的(K,(V,W))對的資料集。外連接配接通過leftOuterJoin,rightOuterJoin和fullOuterJoin來支援。
cogroup(otherDataset, [numTasks]) 當(K,V)和(K,W)類型的資料集被調用時,傳回(K,(Iterable ,Iterable ))元組的資料集。這個操作也被稱為groupWith。
cartesian(otherDataset) 當調用類型T和U的資料集時,傳回(T,U)對(所有元素對)的資料集。
pipe(command, [envVars]) 通過shell指令管理RDD的每個分區,例如,一個Perl或bash腳本。RDD元素被寫入程序的stdin,輸出到stdout的行被作為字元串的RDD傳回。
coalesce(numPartitions) 減少RDD中的分區數量為numPartitions。用于在過濾大型資料集後更高效地運作操作。
repartition(numPartitions) 随機調整RDD中的資料以建立更多或更少的分區并在其間進行平衡。這總是通過網絡混洗所有資料。
repartitionAndSortWithinPartitions(partitioner) 根據給定的分區器對RDD進行重新分區,并在每個結果分區中按key排序。這比調用重新分區,然後在每個分區内進行排序更有效率,因為它可以将排序壓入shuffle機制。

Actions

下表列出了Spark支援的一些常用操作。詳細資訊請參閱RDD API文檔(Scala,Java,Python,R) 并配對RDD函數doc(Scala,Java)。

Action 含義
reduce(func) 使用函數func(它接受兩個參數并傳回一個)聚合資料集的元素。該函數應該是可交換參數和關聯的,以便它可以被正确地并行計算。
collect() 在driver 中将資料集的所有元素作為數組傳回。在過濾器或其他操作傳回足夠小的資料子集之後,這通常很有用。
count() 傳回資料集中元素的數量。
first() 傳回資料集的第一個元素(類似于take(1))。
take(n) 用資料集的前n個元素傳回一個數組。
takeSample(withReplacement, num, [seed]) 傳回一個數組的随機樣本數組,有或沒有替換,可以預先指定一個随機數發生器種子。
takeOrdered(n, [ordering]) 使用自然順序或自定義比較器傳回RDD的前n個元素。
saveAsTextFile(path) 将資料集的元素作為文本檔案(或文本檔案集)寫入本地檔案系統,HDFS或任何其他Hadoop支援的檔案系統的給定目錄中。Spark将在每個元素上調用toString将其轉換為檔案中的一行文本。
saveAsSequenceFile(path) (Java and Scala) 将資料集的元素作為Hadoop SequenceFile寫入本地檔案系統,HDFS或任何其他Hadoop支援的檔案系統的給定路徑中。他可以在實作Hadoop的Writable接口的鍵值對的RDD上使用。在Scala中,它也可用于可隐式轉換為Writable的類型
saveAsObjectFile(path) (Java and Scala) 使用Java序列化以簡單的格式寫入資料集的元素,然後可以使用SparkContext.objectFile()加載。
countByKey() 僅适用于類型(K,V)的RDD。傳回(K,Int)對的hashmap和每個鍵的計數。
foreach(func) 在資料集的每個元素上運作函數func。這通常用于副作用,如更新累加器或與外部存儲系統互動。注意:修改foreach()之外的累加器以外的變量可能會導緻未定義的行為。請參閱了解更多細節。

Spark RDD API還公開了一些Action的異步版本,例如foreach的foreachAsync,它立即将FutureAction傳回給調用者,而不是在完成動作時阻塞。

Shuffle 操作

Spark中的某些操作會觸發一個稱為shuffle的事件。shuffle是Spark重新配置設定資料的機制,以便在不同分區之間進行分組。這通常涉及在執行者和機器之間複制資料,使得shuffle成為複雜而昂貴的操作。

背景

為了了解shuffle過程中發生了什麼,我們可以考慮reduceByKey操作的例子。

reduceByKey操作生成一個新的RDD,其中單個鍵的所有值都組合到一個元組中 - 鍵和對與該鍵相關的所有值執行reduce函數的結果。面臨的挑戰是,并不是所有的單個key的值都必須位于同一個分區,甚至是同一個機器上,但是它們必須位于同一地點才能計算出結果。

在Spark中,資料通常不是跨分區分布,而是在特定操作的必要位置。在計算過程中,單個任務将在單個分區上運作 - 是以,要組織單個reduceByKey reduce任務的所有資料執行,Spark需要對全部資料執行操作。它必須從所有分區中讀取所有鍵的值,然後将各個分區上的值彙總在一起,以計算每個鍵的最終結果 - 這就是所謂的shuffle。

雖然新shuffle資料的每個分區中的元素集合是确定性的,分區本身的排序也是确定性的,但這些元素的排序并不是這樣。如果一個人在shuffle之後需要有序資料,那麼可以使用:

  • 使用mapPartitions.如通過sorted對每個分區進行排序
  • repartitionAndSortWithinPartitions在同時進行重新分區的同時有效地對分區進行排序
  • sortBy 來制作全局排序的RDD

可能導緻shuffle的操作包括repartition 操作(如repartition 和 coalesce),“像groupByKey和reduceByKey一樣的ByKey操作(除計數),以及join 操作(如cogroup和join)。

對性能的影響

Shuffle操作是一個很昂貴的操作,因為它涉及到磁盤IO,序列化,網絡傳輸。為了組織資料,Spark生成一組任務 - map任務來組織資料,以及一組reduce任務來聚合它。這個術語來自MapReduce,并不直接與Spark的map和reduce操作有關。

在内部,map任務的結果被儲存在記憶體中,直到它們儲存不下為止。然後,将這些根據key分區進行排序的結果寫入單個檔案。最後在reduce端,讀取與之相關的排序資料塊。

某些shuffle操作會消耗大量的堆記憶體,因為它們在傳輸資料前後會使用記憶體中的資料結構去組織記錄。具體來說,reduceByKey和aggregateByKey在map上建立這些資料結構,’ByKey操作在reduce方面生成這些資料結構。當資料不适合存儲在記憶體中時(即過大),Spark會将這些資料溢出到磁盤,這會導緻額外的磁盤I / O開銷和增加垃圾回收開銷。

Shuffle也會在磁盤上生成大量的中間檔案。從Spark 1.3開始,這些中間檔案将被保留,直到相應的RDD不再使用并被垃圾回收。保留這些中間是為了在執行重新計算時不需要重新shuffle。如果應用程式保留對這些RDD的引用,或者GC不經常引入,垃圾回收可能會在很長一段時間後才會發生。這意味着長時間運作Spark作業可能會消耗大量的磁盤空間。在配置Spark時,臨時存儲目錄由spark.local.dir配置參數指定。

shuffle行為可以通過調整各種配置參數來調整。詳情請參閱“Spark配置指南”中的“Shuffle Behavior”部分。

RDD持久化

Spark中最重要的功能之一就是在記憶體中保留(或緩存)一個資料集。當持久化RDD時,每個節點存儲它在記憶體中所計算的分區,并在該資料集上的其他操作(或從中派生的資料集)中重用它們。這可以使未來的行動更快(通常超過10倍)。緩存是疊代算法和快速互動式使用的關鍵工具。

您可以使用persist()或cache()方法将RDD标記為持久化。第一次在Action中計算時,它将被儲存在節點的記憶體中。Spark的緩存是容錯的 - 如果RDD的任何分區丢失,它将自動重新計算。

另外,RDD可以使用不同的存儲級别進行存儲,例如,您可以将資料集儲存在磁盤上,将其儲存在記憶體中,但作為序列化的Java對象(以節省空間),将其複制到節點上。這些級别通過傳遞一個StorageLevel對象來持久化來設定。cache()預設StorageLevel.MEMORY_ONLY(将反序列化的對象存儲在記憶體中)。

存儲級别分别有:

Storage Level Meaning
MEMORY_ONLY 将RDD作為反序列化的Java對象存儲在JVM中。如果RDD不适合存儲在記憶體,某些分區将不會被緩存,并且每次需要時都會重新進行計算。這是預設級别。
MEMORY_AND_DISK 将RDD作為反序列化的Java對象存儲在JVM中。如果RDD在記憶體存不下,會存儲剩餘的分區到磁盤上,并在需要時從中讀取。
MEMORY_ONLY_SER 将RDD存儲為序列化的Java對象(每個分區一個位元組的數組)。這通常比反序列化的對象更節省空間,特别是在使用快速序列化器的情況下,但需要消耗更多的CPU資源。
MEMORY_AND_DISK_SER 與MEMORY_ONLY_SER類似,但是将不适合記憶體的分區溢出到磁盤,而不是每次需要時重新計算它們。
DISK_ONLY 僅存儲在磁盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和以上類似,但存儲兩份。
OFF_HEAP (experimental) 存儲在堆外記憶體。需要開啟該功能才能使用

選擇哪個存儲級别?

Spark的存儲級别旨在提供記憶體使用和CPU效率之間的不同折衷。我們建議通過以下過程來選擇一個:

  • 如果您的RDD适合預設的存儲級别(MEMORY_ONLY),請以這種方式存儲它們。這是CPU效率最高的選項,允許RDD上的操作盡可能快地運作。
  • 如果不能以第一種方式存儲,嘗試使用MEMORY_ONLY_SER并選擇一個快速序列化庫來使對象更加節省記憶體空間,但是通路速度仍然相當快。
  • 除非計算你的資料集的函數是昂貴的,否則請不要溢出到磁盤上。除非它們會過濾大量的資料,否則,重新計算分區可能與從磁盤讀取分區一樣快。
  • 如果要快速恢複故障,請使用複制政策的存儲級别。所有的存儲級别通過重新計算丢失的資料來提供完整的容錯能力,但是複制政策的資料可以讓您繼續在RDD上運作任務,而無需等待重新計算丢失的分區。

移除資料

Spark會自動監視每個節點上的高速緩存使用情況,并以最近最少使用(LRU)方式删除舊的資料分區。如果您想要手動删除RDD,而不是等待其從緩存中删除,請使用RDD.unpersist()方法。

共享變量

通常,在遠端叢集節點上執行傳遞給Spark操作(如map或reduce)的函數時,它将在函數中使用的所有變量的副本上運作。這些變量被複制到每台機器上,遠端機器上的變量沒有更新到驅動程式。支援通用的、可讀寫的共享變量将是低效的,但是,Spark為兩種常見使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。

廣播變量

廣播變量允許程式員在每台機器上儲存一個隻讀變量,而不是用任務發送一個隻讀變量的副本。例如,可以使用它們以有效的方式為每個節點提供大型輸入資料集的副本。Spark還嘗試使用高效的廣播算法來分發廣播變量,以降低通信成本。

Spark Action是通過一系列Stage執行的,Stage由分散的shuffle操作分開。Spark會自動廣播每個階段中任務所需的通用資料。以這種方式廣播的資料以序列化形式緩存,并在運作每個任務之前反序列化。這意味着隻有跨多個階段的任務需要相同的資料或以反序列化的形式緩存資料時,顯式建立廣播變量才是有用的。

廣播變量是通過調用SparkContext.broadcast(v)從變量v建立的。廣播變量是v的一個封裝,它的值可以通過調用value方法來通路。下面的代碼展示如何使用:

scala> val broadcastVar = sc.broadcast(Array(, , ))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast()

scala> broadcastVar.value
res0: Array[Int] = Array(, , )
           

在建立廣播變量之後,應該在群集上運作的任何函數中使用值而不是值v,以便v不會多次傳送到節點。另外,對象v在廣播後不應該被修改,以確定所有節點獲得相同的廣播變量值

累加器

累加器是僅通過關聯和交換操作“添加”的變量,是以可以有效地支援并行操作。它們可以用來實作計數器(如在MapReduce中)或者和計算。

Spark本身支援數字類型的累加器,程式員可以添加其對新類型的支援。

作為使用者,您可以建立已命名或未命名的累加器。如下圖所示,一個已命名的累加器(在這種情況下計數器)将顯示在修改該累加器的階段的Web UI中。Spark在“任務”表中顯示每次任務修改的累加器的值。

Spark 2.2.1 官方文檔翻譯 RDD程式設計指南(RDD Programming Guide)概覽通過編寫應用程式使用Spark通過Spark-Shell使用Spark彈性分布式資料集(RDD)共享變量單元測試

跟蹤使用者界面中的累加器對于了解運作階段的進度非常有用(注意:Python尚不支援)。

可以通過調用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來分别累積Long或Double類型的值來建立數字累加器。在群集上運作的任務可以使用add方法添加它。但是,他們無法讀它的值。隻有Driver可以使用其value方法讀取累加器的值。

下面的代碼顯示了一個累加器被用來計算一個數組的和:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: , name: Some(My Accumulator), value: )

scala> sc.parallelize(Array(, , , )).foreach(x => accum.add(x))
...
// :: INFO SparkContext: Tasks finished in  s

scala> accum.value
res2: Long = 
           

雖然這段代碼使用了對Long類型的累加器的内置支援,程式員也可以通過繼承AccumulatorV2來建立自己的類型。

AccumulatorV2抽象類有幾個方法必須重寫:reset方法重置累加器為零,add方法用于向累加器中添加另一個值,merge方法合并另一個相同類型的累加器到這個累加器中。其他必須被覆寫的方法包含在API文檔中。

例如,假設我們有一個表示數學向量的MyVector類,我們可以這樣寫:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
           

請注意,當程式員定義自己的AccumulatorV2類型時,結果資料類型可能與添加元素的資料類型不同。

對于僅在Action内執行的累加器更新,Spark保證每個任務對累加器的更新隻會應用一次,即重新啟動的任務不會更新該值。

在transformations中,使用者應該意識到,如果任務或作業階段被重新執行,每個任務的更新可能會被應用多次。

累加器不會改變Spark的懶惰加載模型。如果它們在RDD上的操作中被更新,則其值僅在RDD作為動作的一部分計算之後才被更新。是以,在像map()這樣的惰性轉換中進行累加器更新并不能保證執行。下面的代碼片段示範了這個屬性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still  because no actions have caused the map operation to be computed.
           

單元測試

Spark對任何流行的單元測試架構的單元測試都很友好。隻需在主URL設定為本地的情況下在測試中建立一個SparkContext,運作您的操作,然後調用SparkContext.stop()将其關閉。確定在最後調用SparkContext.stop(),因為Spark不支援在同一個程式中同時運作兩個SparkContext。

繼續閱讀