天天看點

SparkCore 詳解一、 RDD概念二、 RDD程式設計

  • 一 RDD概念
    • 1 RDD為什麼會産生
    • 2 RDD概述
      • 21 什麼是RDD
      • 22 RDD的屬性
    • 3 13 RDD彈性
    • 4 RDD特點
      • 41 分區
      • 42 隻讀
      • 43 依賴
      • 44 緩存
      • 45 checkpoint
  • 二 RDD程式設計
    • 1 程式設計模型
    • 2 22 建立RDD

一、 RDD概念

1.1 RDD為什麼會産生

RDD是Spark的基石,是實作Spark資料處理的核心抽象。那麼RDD為什麼會産生呢?

Hadoop的MapReduce是一種基于資料集的工作模式,面向資料,這種工作模式一般是從存儲上加載資料集,然後操作資料集,最後寫入實體儲存設備。資料更多面臨的是一次性處理。

MR的這種方式對資料領域兩種常見的操作不是很高效。第一種是疊代式的算法。比如機器學習中ALS、凸優化梯度下降等。這些都需要基于資料集或者資料集的衍生資料反複查詢反複操作。MR這種模式不太合适,即使多MR串行處理,性能和時間也是一個問題。資料的共享依賴于磁盤。另外一種是互動式資料挖掘,MR顯然不擅長。

MR中的疊代:

SparkCore 詳解一、 RDD概念二、 RDD程式設計

Spark中的疊代:

SparkCore 詳解一、 RDD概念二、 RDD程式設計

我們需要一個效率非常快,且能夠支援疊代計算和有效資料共享的模型,Spark應運而生。RDD是基于工作集的工作模式,更多的是面向工作流。

但是無論是MR還是RDD都應該具有類似位置感覺、容錯和負載均衡等特性。

1.2 RDD概述

1.2.1 什麼是RDD

RDD(Resilient Distributed Dataset)叫做分布式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分區、裡面的元素可并行計算的集合。在 Spark 中,對資料的所有操作不外乎建立 RDD、轉化已有RDD 以及調用 RDD 操作進行求值。每個 RDD 都被分為多個分區,這些分區運作在叢集中的不同節點上。RDD 可以包含 Python、Java、Scala 中任意類型的對象, 甚至可以包含使用者自定義的對象。RDD具有資料流模型的特點:自動容錯、位置感覺性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地将工作集緩存在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD支援兩種操作:轉化操作和行動操作。RDD 的轉化操作是傳回一個新的 RDD的操作,比如 map()和 filter(),而行動操作則是向驅動器程式傳回結果或把結果寫入外部系統的操作。比如 count() 和 first()。

Spark采用惰性計算模式,RDD隻有第一次在一個行動操作中用到時,才會真正計算。Spark可以優化整個計算過程。預設情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。

1.2.2 RDD的屬性

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1) 一組分片(Partition),即資料集的基本組成機關。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會采用預設值。預設值就是程式所配置設定到的CPU Core的數目。

2) 一個計算每個分區的函數。Spark中RDD的計算是以分片為機關的,每個RDD都會實作compute函數以達到這個目的。compute函數會對疊代器進行複合,不需要儲存每次計算的結果。

3) RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,是以RDD之間就會形成類似于流水線一樣的前後依賴關系。在部分分區資料丢失時,Spark可以通過這個依賴關系重新計算丢失的分區資料,而不是對RDD的所有分區進行重新計算。

4) 一個Partitioner,即RDD的分片函數。目前Spark中實作了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于範圍的RangePartitioner。隻有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

5) 一個清單,存儲存取每個Partition的優先位置(preferred location)。對于一個HDFS檔案來說,這個清單儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地将計算任務配置設定到其所要處理資料塊的存儲位置。

RDD是一個應用層面的邏輯概念。一個RDD多個分片。RDD就是一個中繼資料記錄集,記錄了RDD記憶體所有的關系資料。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1.3 1.3 RDD彈性

  • 1) 自動進行記憶體和磁盤資料存儲的切換 

    Spark優先把資料放到記憶體中,如果記憶體放不下,就會放到磁盤裡面,程式進行自動的存儲切換

  • 2) 基于血統的高效容錯機制 

    在RDD進行轉換和動作的時候,會形成RDD的Lineage依賴鍊,當某一個RDD失效的時候,可以通過重新計算上遊的RDD來重新生成丢失的RDD資料。

  • 3) Task如果失敗會自動進行特定次數的重試 

    RDD的計算任務如果運作失敗,會自動進行任務的重新計算,預設次數是4次。

  • 4) Stage如果失敗會自動進行特定次數的重試 

    如果Job的某個Stage階段計算失敗,架構也會自動進行任務的重新計算,預設次數也是4次。

  • 5) Checkpoint和Persist可主動或被動觸發 

    RDD可以通過Persist持久化将RDD緩存到記憶體或者磁盤,當再次用到該RDD時直接讀取就行。也可以将RDD進行檢查點,檢查點會将資料存儲在HDFS中,該RDD的所有父RDD依賴都會被移除。

  • 6) 資料排程彈性 

    Spark把這個JOB執行模型抽象為通用的有向無環圖DAG,可以将多Stage的任務串聯或并行執行,排程引擎自動處理Stage的失敗以及Task的失敗。

  • 7) 資料分片的高度彈性 

    可以根據業務的特征,動态調整資料分片的個數,提升整體的應用執行效率。

RDD全稱叫做彈性分布式資料集(Resilient Distributed Datasets),它是一種分布式的記憶體抽象,表示一個隻讀的記錄分區的集合,它隻能通過其他RDD轉換而建立,為此,RDD支援豐富的轉換操作(如map, join, filter, groupBy等),通過這種轉換操作,新的RDD則包含了如何從其他RDDs衍生所必需的資訊,是以說RDDs之間是有依賴關系的。基于RDDs之間的依賴,RDDs會形成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是通過血緣關系(Lineage)一氣呵成的,即使出現資料分區丢失,也可以通過血緣關系重建分區,總結起來,基于RDD的流式計算任務可描述為:從穩定的實體存儲(如分布式檔案系統)中加載記錄,記錄被傳入由一組确定性操作構成的DAG,然後寫回穩定存儲。另外RDD還可以将資料集緩存到記憶體中,使得在多個操作之間可以重用資料集,基于這個特點可以很友善地建構疊代型應用(圖計算、機器學習等)或者互動式資料分析應用。可以說Spark最初也就是實作RDD的一個分布式系統,後面通過不斷發展壯大成為現在較為完善的大資料生态系統,簡單來講,Spark-RDD的關系類似于Hadoop-MapReduce關系。

1.4 RDD特點

RDD表示隻讀的分區的資料集,對RDD進行改動,隻能通過RDD的轉換操作,由一個RDD得到一個新的RDD,新的RDD包含了從其他RDD衍生所必需的資訊。RDDs之間存在依賴,RDD的執行是按照血緣關系延時計算的。如果血緣關系較長,可以通過持久化RDD來切斷血緣關系。

1.4.1 分區

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1.4.2 隻讀

如下圖所示,RDD是隻讀的,要想改變RDD中的資料,隻能在現有的RDD基礎上建立新的RDD。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

由一個RDD轉換到另一個RDD,可以通過豐富的操作算子實作,不再像MapReduce那樣隻能寫map和reduce了,如下圖所示。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

RDD的操作算子包括兩類,一類叫做transformations,它是用來将RDD進行轉化,建構RDD的血緣關系;另一類叫做actions,它是用來觸發RDD的計算,得到RDD的相關計算結果或者将RDD儲存的檔案系統中。下圖是RDD所支援的操作算子清單。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1.4.3 依賴

RDDs通過操作算子進行轉換,轉換得到的新RDD包含了從其他RDDs衍生所必需的資訊,RDDs之間維護着這種血緣關系,也稱之為依賴。如下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另一種是寬依賴,下遊RDD的每個分區與上遊RDD(也稱之為父RDD)的每個分區都有關,是多對多的關系。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

通過RDDs之間的這種依賴關系,一個任務流可以描述為DAG(有向無環圖),如下圖所示,在實際執行過程中寬依賴對應于Shuffle(圖中的reduceByKey和join),窄依賴中的所有轉換操作可以通過類似于管道的方式一氣呵成執行(圖中map和union可以一起執行)。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1.4.4 緩存

如果在應用程式中多次使用同一個RDD,可以将該RDD緩存起來,該RDD隻有在第一次計算的時候會根據血緣關系得到分區的資料,在後續其他地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關系計算,這樣就加速後期的重用。如下圖所示,RDD-1經過一系列的轉換後得到RDD-n并儲存到hdfs,RDD-1在這一過程中會有個中間結果,如果将其緩存到記憶體,那麼在随後的RDD-1轉換到RDD-m這一過程中,就不會計算其之前的RDD-0了。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

1.4.5 checkpoint

雖然RDD的血緣關系天然地可以實作容錯,當RDD的某個分區資料失敗或丢失,可以通過血緣關系重建。但是對于長時間疊代型應用來說,随着疊代的進行,RDDs之間的血緣關系會越來越長,一旦在後續疊代過程中出錯,則需要通過非常長的血緣關系去重建,勢必影響性能。為此,RDD支援checkpoint将資料儲存到持久化的存儲中,這樣就可以切斷之前的血緣關系,因為checkpoint後的RDD不需要知道它的父RDDs了,它可以從checkpoint處拿到資料。

給定一個RDD我們至少可以知道如下幾點資訊:1、分區數以及分區方式;2、由父RDDs衍生而來的相關依賴資訊;3、計算每個分區的資料,計算步驟為:1)如果被緩存,則從緩存中取的分區的資料;2)如果被checkpoint,則從checkpoint處恢複資料;3)根據血緣關系計算分區的資料。

二、 RDD程式設計

2.1 程式設計模型

在Spark中,RDD被表示為對象,通過對象上的方法調用來對RDD進行轉換。經過一系列的transformations定義RDD之後,就可以調用actions觸發RDD的計算,action可以是向應用程式傳回結果(count, collect等),或者是向存儲系統儲存資料(saveAsTextFile等)。在Spark中,隻有遇到action,才會執行RDD的計算(即延遲計算),這樣在運作時可以通過管道的方式傳輸多個轉換。

要使用Spark,開發者需要編寫一個Driver程式,它被送出到叢集以排程運作Worker,如下圖所示。Driver中定義了一個或多個RDD,并調用RDD上的action,Worker則執行RDD分區計算任務。

SparkCore 詳解一、 RDD概念二、 RDD程式設計

Dirver是啥? 

SparkContext是啥? 

Executor是啥? 

Master是啥? 

Worker是啥?

SparkCore 詳解一、 RDD概念二、 RDD程式設計

2.2 2.2 建立RDD

在Spark中建立RDD的建立方式大概可以分為三種:(1)、從集合中建立RDD;(2)、從外部存儲建立RDD;(3)、從其他RDD建立。

1) 由一個已經存在的Scala集合建立,集合并行化。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

而從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD。我們可以先看看這兩個函數的聲明:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
           

我們可以從上面看出makeRDD有兩種實作,而且第一個makeRDD函數接收的參數和parallelize完全一緻。其實第一種makeRDD函數實作是依賴了parallelize函數的實作,來看看Spark中是怎麼實作這個makeRDD函數的:

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  parallelize(seq, numSlices)
}
           

我們可以看出,這個makeRDD函數完全和parallelize函數一緻。但是我們得看看第二種makeRDD函數函數實作了,它接收的參數類型是Seq[(T, Seq[String])],Spark文檔的說明是:

Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.

原來,這個函數還為資料提供了位置資訊,來看看我們怎麼使用:

scala> val rdd1= sc.parallelize(List(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21

scala> val rdd2 = sc.makeRDD(List(1,2,3))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21

scala> val seq = List((1, List("slave01")),
     | (2, List("slave02")))
seq: List[(Int, List[String])] = List((1,List(slave01)),
 (2,List(slave02)))

scala> val rdd3 = sc.makeRDD(seq)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23

scala> rdd3.preferredLocations(guigu3.partitions(1))
res26: Seq[String] = List(slave02)

scala>rdd3.preferredLocations(guigu3.partitions(0))
res27: Seq[String] = List(slave01)

scala> rdd1.preferredLocations(guigu1.partitions(0))
res28: Seq[String] = List()
           

我們可以看到,makeRDD函數有兩種實作,第一種實作其實完全和parallelize一緻;而第二種實作可以為資料提供位置資訊,而除此之外的實作和parallelize函數也是一緻的,如下:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
  assertNotStopped()
  val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
  new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
           

都是傳回ParallelCollectionRDD,而且這個makeRDD的實作不可以自己指定分區的數量,而是固定為seq參數的size大小。

2) 由外部存儲系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等

scala> val rdd= sc.textFile("hdfs://master01:9000/RELEASE")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master01:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
           

繼續閱讀