天天看點

Spark Programming Guide概述建構spark程式初始化 SparkRDD共享變量部署到叢集java和scala的啟動方式

Table of Contents

概述

建構spark程式

初始化 Spark

Spark-shell

RDD

并行集合

外部資料集

RDD算子

基礎

将函數作為參數傳遞

閉包

Shuffle 

RDD 持久化

如何選擇存儲級别

共享變量

Broadcast

Accumulators

部署到叢集

java和scala的啟動方式

概述

每個 Spark 程式都有一個 Driver 程式,該程式用來執行 main 函數,建立 SparkContext,準備 Spark 程式的執行環境。彈性分布式資料集 RDD,是Spark 中的基本資料結構,通過 HDFS 檔案系統或者另一個 RDD 來建立。也可以持久化到記憶體中。

第二個抽象的概念是可以在并行的任務中使用相同的變量,即共享變量。Spark支援兩種類型的共享變量:廣播變量(可用于在所有節點的記憶體中緩存一個值)和累加器(僅“添加”到其中的變量,如計數器和)。

建構spark程式

預設情況下,Spark 2.1.1是使用Scala 2.11建構和釋出的。(Spark也可以與Scala的其他版本相容。)要用 Scala 編寫應用程式,你需要使用一個相容的Scala版本(例如2.11.X)。

<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.1.1</spark.version>
           
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
           

需要導入的類

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
           

初始化 Spark

先建立一個 SparkConf 配置資訊對象,再建立一個 SparkContext 上下文對象。

val conf = new SparkConf().setAppName(appName).setMaster(master)
val context = new SparkContext(conf)
           

每個 JVM 隻能激活一個 SparkContext。在建立新上下文之前,必須 context.stop() 來停止目前活動的 SparkContext。

appName 是你的程式在叢集 UI 界面上顯示的名稱。master 是 Messo、Yarn 或 local 本地模式。

Spark-shell

使用 Spark-shell 已經建立了一個預設的 SparkContext 對象,可以通過 --master 指定送出的資源管理叢集,通過 --jars 傳一個都好分隔的 jar 添加到 classpath 中,通過 --packages 自定maven 依賴。如下使用4個core來執行:

$ ./bin/spark-shell --master local[4]
           

添加 jar 到 classpath

$ ./bin/spark-shell --master local[4] --jars code.jar
           

指定 maven 坐标

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
           
[[email protected] bin]# ./spark-shell --help
Usage: ./bin/spark-shell [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          使用者程式 main class (for Java / Scala apps).
  --name NAME                 application 名稱
  --jars JARS                 driver 和 executer classpath上的逗号分隔的本地jar
  --packages                  格式為:groupId:artifactId:version
  --exclude-packages          groupId:artifactId,避免 --packages 中的依賴沖突
  --repositories              附加遠端存儲庫的逗号分隔清單,搜尋——包給出的maven坐标。

  --py-files PY_FILES         要放置的.zip、.egg或.py檔案的逗号分隔清單,Python應用程式的            
                              Python路徑。

  --files FILES               要放置在工作中的檔案的逗号分隔清單
                              每個 executer 的目錄。

  --conf PROP=VALUE           任意 spark 的配置屬性
  --properties-file FILE      加載額外屬性的檔案的路徑。 如果不    
                              指定将使用 conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Spark standalone with cluster deploy mode only:
  --driver-cores NUM          Cores for driver (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

 YARN-only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.
           

RDD

建立 RDD 有兩種方法:并行化驅動程式中的現有集合,或者引用外部存儲系統中的資料集,比如共享檔案系統、HDFS、HBase 或任何提供 Hadoop InputFormat 的資料源。

并行集合

并行化集合是通過在程式中調用 SparkContext 的 parallelize 方法來建立的。将集合的元素複制以形成可并行操作的分布式資料集。例如,這裡是如何建立一個并行的集合容納數字1到5:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
           

建立完成之後,就可以對 RDD 進行操作了,如使用 distData.reduce((a, b) => a + b) 對清單中的資料進行累加。

并行集合的計算的一個重要參數是按指定數量對資料進行分區。每一個分區對應一個 task,一般情況下一個 CPU 對應2-4個分區,spark 叢集會自動設定分區數量。也可以手動将分區數量作為第二個參數傳入:sc.parallelize(data, 10)。

外部資料集

Spark 可以從 Hadoop 支援的任何存儲源建立分布式資料集,包括本地檔案系統、HDFS、Cassandra、HBase、Amazon S3等。Spark 支援文本檔案、Sequence 檔案和任何其他 Hadoop InputFormat。

使用 SparkContext 的 textFile 方法建立文本檔案 RDD,擷取檔案的位址:本地路徑、hdfs:\\、s3n://,并将其作為行 list 讀取,如下執行個體:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
           

建立完成之後,就可以對資料集進行計算,如使用 distFile.map(s=>s.length).reduce((a,b)=>a+b) 來計算字元數量。

以下是關于讀檔案的注意事項:

  • 如果使用本地路徑,則檔案必須在所有 work 節點上,或者共享的網絡上。
  • Spark 所有基于檔案的輸入方法,支援目錄、壓縮檔案和通配符,例如:

    textFile("/my/directory")

    ,

    textFile("/my/directory/*.txt")

    , and

    textFile("/my/directory/*.gz")

  • textFile 還接收第二個參數用來控制分區的數量。預設情況下,一個資料塊産生一個分區,可以通過傳入第二個參數來增加分區,注意:分區的數量不能少于資料塊的數量

除了文本檔案,scala 還支援其他幾種資料格式:

  • sparkConext.wholeTextFiles 可以讀取包含多個小檔案的目錄,并以(filename,content)鍵值對的形式傳回,與 textFile 相反,每個檔案每行作為一條記錄傳回。
  • 對于 SequenceFile,SparkContext.sequenceFile[K,V]方法中的 KV 對應檔案中鍵和值的類型。應該是 Hadoop 的可寫接口的子類,比如 IntWritable 和 Text,Spark 支援一些常見的類型,如 sequenceFile[Int, Stirng] 對應 Intwritable 和 Text。
  • 對于其他的 Hadoop InputFormat,可以使用 SparkContext.hadoopRDD 方法,通過傳入 JobConf、輸入格式類、key 類、value 類。與 Hadoop 相同的方式,sparkContext.newAPIHadoopRDD 
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支援java 序列化的方式儲存 RDD。但是沒有 Avro 格式好,但是提供最簡單的方式儲存任何 RDD

RDD算子

RDD 支援兩種類型的操作:action 和 transformation。例如 map 是一個 transformation 算子,通過傳遞一個函數并傳回一個新的 RDD 資料集。reduce 算子将使用傳入的函數聚合所有 RDD 資料集,并将結果傳回給 Driver。

所有的 transformation 算子都是惰性的,也就是說真正的開始執行發生在 action 算子上,這種設計使 Spark 的計算更高效的執行。

預設情況下,可以連續的對 RDD 進行 transformation,也可以使用persist 和 cache 方法 将資料持久化到叢集記憶體中,用來減少 IO,提高計算效率,這是 Spark 的核心特征,也可以持久化到磁盤或跨多個節點複制 RDD。

基礎

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
           
  • 第一行定義了來自外部檔案的資料 RDD。這個資料集沒有加載到記憶體中,也沒有在其他地方執行:行隻是指向檔案的指針。
  • 第二行将 lineLengths 定義為映射轉換的結果。同樣,由于懶惰,lineLengths不是立即計算的。
  • 最後,運作 reduce,這是一個動作。此時,Spark 将計算分解為在不同的機器上運作的任務,每台機器都運作其部分映射和局部約簡,隻向驅動程式傳回其答案。

如果以後還要使用該資料的話,可以持久化:

lineLengths.persist()
           

在 reduce 之前,會在第一次計算之後儲存到記憶體中。

将函數作為參數傳遞

Spark 的 API 嚴重依賴傳遞給 Driver 的函數,通常有兩種傳遞方式:

  • 匿名函數;
  • 全局單例對象中的靜态方法。如:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
           

注意:将引用傳遞給類中的方法時,也需要将類中的對象和方法傳進去。如:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
           
class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
           

 等同于編寫rdd.map(x = > this.field + x),它引用了所有這些。為了避免這個問題,最簡單的方法是将字段複制到一個局部變量中,而不是從外部通路它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
           

閉包

涉及共享變量和方法的有效範圍和生命周期。

例子

下面簡單的 RDD 元素累加,單機模式和叢集模式下是不同的。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
           

Local vs. cluster modes

上述代碼的行為是未定義的,可能無法按預期工作。為了執行作業,Spark 将 RDD 操作的處理分解為任務,每個任務由執行程式執行。在執行之前,Spark 計算任務的閉包。閉包是那些執行程式在 RDD 上執行其計算時必須可見的變量和方法(在本例中為foreach())。這個閉包被序列化并發送給每個執行器。

傳遞給每個執行器的閉包中的變量是副本,是以,當在 foreach 函數中引用 counter 時,它不再是 Dirver 節點上的計數器。在 Driver 節點的記憶體中仍然有一個計數器,但它對 Executrer 不再可見!Executer 隻看到來自序列化閉包的副本。是以,counter 的最終值仍然是零,因為 counter 上的所有操作都引用了序列化閉包中的值。

在本地模式下某些情況,foreach 函數實際上會在與 Driver 相同的 JVM 中執行,并引用相同的原始計數器,并可能實際更新它。

為了確定在這類場景中定義良好的行為,應該使用累加器。Spark 中的累加器專門用于提供一種機制,以便在叢集中的工作節點之間執行分割時安全地更新變量。

一般來說,像循環或局部定義方法這樣的閉包結構不應該用來改變全局狀态。Spark不定義或保證閉包外部引用的對象的突變行為。一些這樣做的代碼可能在本地模式下工作,但那隻是偶然的,而且這樣的代碼在分布式模式下不會像預期的那樣工作。如果需要全局聚合,則使用累加器。

Printing elements of an RDD

另一個常見的習慣用法是嘗試使用 RDD.foreach(println) 或 RDD.map(println)列印出 RDD 的元素。在一台機器上,這将生成預期的輸出并列印所有的 RDD 元素。但是,在叢集模式下,執行器調用的 stdout 輸出現在是寫入執行器的 stdout,而不是寫入驅動程式上的 stdout,是以驅動程式上的 stdout 不會顯示這些!要列印驅動程式上的所有元素,可以使用 collect() 方法首先将 RDD 帶到驅動程式節點,如下所示:RDD.collect().foreach(println)。這可能會導緻 Driver 的記憶體耗盡。

KV鍵值對類型的資料在 Spark 是如何使用、Transformation 算子、Action 算子:Spark RDD 算子

Shuffle 

shuffle 是 spark 中對資料進行分區和分組的一種機制,通常是跨 Executer 和機器複制資料,使其成為耗資源的操作,帶有 shuffle 的算子有:

sortByKey
repartitionAndSortWithinPartitions
partitionBy
coalesce
combineByKey
aggregateByKey
foldByKey
reduceByKey
countApproxDistinctByKey
groupByKey
cogroup
subtractByKey
           

背景

為了了解在 shuffle 期間會發生什麼,我們以 reduceByKey 為例子。reduceByKey 生成一個新的 RDD,其中單個鍵的所有值都被組合成一個元組(鍵和對)與該鍵關聯的所有值執行 reduce 函數的結果。挑戰在于,單個鍵的所有值不一定都位于相同的分區,甚至也不一定位于同一台機器上,但它們必須位于同一位置才能計算結果。

在 Spark 中,資料不會存放在指定的位置,計算的時候,任務會分發到資料節點中執行,reduceByKey 中的 reduce 從所有分區中讀取所有 key 的值,然後将各個分區的值放在一起,計算每個 key 的最終結果,整個過程稱之為 shuffle。

  • mappartition來對每個分區進行排序,例如,使用.sort
  • repartitionAndSortWithinPartitions可以有效地對分區進行排序,同時進行重新分區
  • 建立一個全局有序的RDD

可能導緻混亂的算子包括 repartition 算子(如 repartition 和  coalesce)、ByKey 算子(除 counting 外)(如 groupByKey 和 reduceByKey )以及 join 算子(如 cogroup 和 join)。

性能

shuffle 操作會産生磁盤IO、資料序列化、網絡IO,是以會影響整個流處理的性能。

在 Spark 内部,來自單個 map 任務的結果會儲存在記憶體中,知道任務結束。然後根據目标分區排序并寫入檔案。在 reduce 端,讀取相關的資料塊。

某些 shuffle 算子會消耗大量堆記憶體,因為它們使用記憶體中的資料結構來組織傳輸之前或之後的記錄。具體來說,reduceByKey 和 aggregateByKey 在 map 端建立這些結構,而' ByKey 操作在 reduce 端生成這些結構。當資料不适合記憶體時,Spark 會将這些表溢出到磁盤,導緻磁盤I/O的額外開銷和增加的垃圾收集。

Shuffle 還會在磁盤上生成大量的中間檔案。從Spark 1.3開始,這些檔案将一直保留到不再使用相應的 RDDs 并進行垃圾收集。這樣做是為了在重新計算時不需要重新建立 shuffle 檔案。如果應用程式保留對這些 rdds 的引用,或者 GC 不經常啟動,那麼垃圾收集可能隻會在很長一段時間之後才會發生。這意味着長時間運作的 Spark 作業可能會消耗大量磁盤空間。SparkContxt 的 

spark.local.dir 用來配置指定緩存檔案存儲的目錄。

RDD 持久化

跨機器在記憶體中持久化 RDD,每個節點儲存 RDD 的一部分可計算資料。通過 persist 和 cache 方法來持久化。Spark 的緩存是容錯性的,如果資料丢失,spark 會從最初建立他的算子中自動重新計算。此外,持久化是有存儲級别的:

Storage Level Meaning
MEMORY_ONLY 将 RDD 作為反序列化的 Java 對象存儲在 JVM 中。如果 RDD 不适合記憶體,那麼一些分區将不會被緩存,而是在需要它們時動态地重新計算。這是預設級别。
MEMORY_AND_DISK 将 RDD 作為反序列化的 Java 對象存儲在 JVM 中。如果 RDD 不适合記憶體,那麼将不适合的分區存儲在磁盤上,并在需要時從那裡讀取它們。

MEMORY_ONLY_SER

(Java and Scala)

将 RDD 存儲為序列化的 Java 對象(每個分區一個位元組數組)。這通常比反序列化對象更節省空間,特别是在使用快速序列化器時,但讀取時需要更多cpu。

MEMORY_AND_DISK_SER

(Java and Scala)

類似于 MEMORY_ONLY_SER,但是将不适合記憶體的分區溢出到磁盤,而不是每次需要時動态地重新計算它們。
DISK_ONLY 隻在磁盤上存儲 RDD 分區。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 與上面的級别相同,但是在兩個叢集節點上複制每個分區。
OFF_HEAP (experimental) 類似 于MEMORY_ONLY_SER,但将資料存儲在堆外記憶體中。這需要啟用堆外記憶體。
注意:在Python中,存儲的對象将始終使用Pickle庫進行序列化,是以是否選擇序列化級别并不重要。Python中可用的存儲級别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY和DISK_ONLY_2。

甚至在沒有使用者調用 persist 的情況下,Spark 也會自動持久化一些 shuffle 算子中的中間資料(例如 reduceByKey)。這樣做是為了避免在節點轉移期間失敗時重新計算整個輸入。我們仍然建議使用者在計劃重用結果 RDD 時調用 persist。

如何選擇存儲級别

存儲級别實際上就是在 cpu 和記憶體上做權衡。

  • MEMORY_ONLY 是cpu效率最高的選項,允許 rdd 上的算子盡可能快的執行。
  • 使用MEMORY_ONLY_SER并選擇一個快速序列化庫,使對象更節省空間,且通路速度仍然相當快。(Java和Scala)
  • 如果重新計算的速度比讀磁盤的速度快,那麼就不要持久化到磁盤。
  • 如果需要快速的故障恢複,請使用複制的存儲級别(例如,如果使用Spark為來自web應用程式的請求提供服務)。通過重新計算丢失的資料,所有存儲級别都提供了完全的容錯能力,但是複制的存儲級别允許您在RDD上繼續運作任務,而不必等待重新計算丢失的分區。

Spark 自動監視每個節點上的緩存使用情況,并以最近最少使用(LRU)的方式删除舊的資料分區。 也可以使用RDD.unpersist()方法直接删除。

共享變量

顧名思義,就是在不同節點上操作同一個變量。Spark 提供了以下兩種方式。

Broadcast

廣播是在每台機器上緩存一個隻讀變量,不是将其副本與任務一起發送。

隻有當跨多個階段的任務需要相同的資料,或者以反序列化的形式緩存資料很重要時,顯式地建立廣播變量才有用。

Broadcast 變量是通過調用 SparkContext.broadcast(v) 從變量 v 建立的。broadcast 變量是 v 的包裝器,它的值可以通過調用value 方法來通路。代碼如下所示:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
           

對象被執行廣播操作之後不應該再修改,以確定所有節點得到廣播變量的相同值。

Accumulators

可以通過調用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來分别累積Long或Double類型的值來建立數值累加器。然後,可以使用add方法将運作在叢集上的任務添加到叢集中。然而,他們無法讀取它的值。隻有 Driver 可以讀取累加器的值,使用 value 方法。

下面的代碼顯示了一個累加器,用于将數組中的元素相加:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

scala> accum.value
res2: Long = 10
           

也可以繼承 AccumulatorV2 實作自定義計數器,需要重寫以下方法:reset(将累加器重置為零)、add(将另一個值添加到累加器中)、merge(将另一個相同類型的累加器合并到這個累加器中)。

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
           
自定義累加器結果類型可能與添加的元素的類型不同。

累加器的更新隻存在于 action 算子,Spark 保證每個任務對累加器的更新隻應用一次,即重新開機任務不會再次更新。但是在 Transformation 算子中重新開機任務每個任務的更新可能會是多次。

累加器的更新隻在 rdd 的 action 算子中進行。是以不能保證在 map() 這樣的延遲算子中執行累加器的更新。如下代碼:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// 至此,累加器的值還是0,因為沒有action 算子開始執行計算
           

部署到叢集

Spark Submitting Applications Guide

java和scala的啟動方式

啟動Spark應用程式的庫。

這個庫允許應用程式以程式設計方式啟動Spark。這個庫隻有一個入口點——SparkLauncher類。可用于啟動Spark并提供一個助手來監視和控制運作中的應用程式:

import org.apache.spark.launcher.SparkAppHandle;
   import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
     public static void main(String[] args) throws Exception {
       SparkAppHandle handle = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .startApplication();
       // Use handle API to monitor / control application.
     }
   }
 
           

也可以使用SparkLauncher.launch()方法來啟動原始的子程序:

import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
     public static void main(String[] args) throws Exception {
       Process spark = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .launch();
       spark.waitFor();
     }
   }
           

此方法要求調用代碼手動管理子程序,包括其輸出流(以避免可能的死鎖)。建議使用 SparkLauncher.startApplication( org.apache.spark.launcher.SparkAppHandle.Listener...) 

原文位址:http://spark.apache.org/docs/2.1.1/api/java/index.html?org/apache/spark/launcher/package-summary.html

繼續閱讀