天天看點

SparkCore-RDD程式設計

一、程式設計模型

        在Spark中,RDD被表示為對象,通過對象上的方法調用來對RDD進行轉換。經過一系列的transformations定義RDD之後,就可以調用actions觸發RDD的計算,action可以是向應用程式傳回結果(count, collect等),或者是向存儲系統儲存資料(saveAsTextFile等)。在Spark中,隻有遇到action,才會執行RDD的計算(即延遲計算),這樣在運作時可以通過管道的方式傳輸多個轉換。

       要使用Spark,開發者需要編寫一個Driver程式,它被送出到叢集以排程運作Worker,如下圖所示。Driver中定義了一個或多個RDD,并調用RDD上的action,Worker則執行RDD分區計算任務。

SparkCore-RDD程式設計
SparkCore-RDD程式設計

二、RDD的建立

      在Spark中建立RDD的建立方式可以分為三種:從集合中建立RDD;從外部存儲建立RDD;從其他RDD建立。

1.從集合中建立

   從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD

(1)使用parallelize()從集合建立

scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
           

(2)使用makeRDD()從集合建立

scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
           

2.由外部存儲系統的資料集建立

   包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等

scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
           

3.從其他RDD建立

   算子的轉換形成RDD

三、RDD的轉換

   RDD整體上分為Value類型和Key-Value類型

詳情請見:https://blog.csdn.net/weixin_43233971/article/details/103076286

四、Action(行動算子)

1.reduce(function)

   作用:通過func函數聚集RDD中的所有元素,先聚合分區内資料,再聚合分區間資料。

建立一個RDD[Int]

scala> val rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
           

聚合RDD[Int]所有元素

scala> rdd1.reduce(_+_)

res50: Int = 55
           

建立一個RDD[String]

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
           

聚合RDD[String]所有資料

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))

res51: (String, Int) = (adca,12)
           

2.collect()

   作用:在驅動程式中,以數組的形式傳回資料集的所有元素。

建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
           

(2)将結果收集到Driver端

scala> rdd.collect

res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)   
           

3.count()

   作用:傳回RDD中元素的個數

建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
           

統計該RDD的條數

scala> rdd.count

res1: Long = 10
           

4.first()

   作用:傳回RDD中的第一個元素

建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
           

統計該RDD的條數

scala> rdd.first

res2: Int = 1
           

5.take(n)

   作用:傳回一個由RDD的前n個元素組成的數組

建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

           

統計該RDD的條數

scala> rdd.take(3)

res10: Array[Int] = Array(2, 5, 4)
           

6.takeOrdered(n)

   作用:傳回該RDD排序後的前n個元素組成的數組

建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
           

(2)統計該RDD的條數

scala> rdd.takeOrdered(3)

res18: Array[Int] = Array(2, 3, 4)
           

7.aggregate 

  參數:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

  作用:aggregate函數将每個分區裡面的元素通過seqOp和初始值進行聚合,然後用combine函數将每個分區的結果和初始值(zeroValue)              進行combine操作。這個函數最終傳回的類型不需要和RDD中元素類型一緻。

建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
           

(2)将該RDD所有元素相加得到結果

scala> rdd.aggregate(0)(_+_,_+_)

res22: Int = 55
           

8.flod(num)(function)

   作用:折疊操作,aggregate的簡化操作,seqop和combop一樣。

建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
           

(2)将該RDD所有元素相加得到結果

scala> rdd.fold(0)(_+_)

res24: Int = 55
           

9.saveAsTextFile(path)

作用:将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本

10.saveAsSequenceFile(path) 

作用:将資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。

11.saveAsObjectFile(path) 

作用:用于将RDD中的元素序列化成對象,存儲到檔案中。

12.countByKey() 

    作用:針對(K,V)類型的RDD,傳回一個(K,Int)的map,表示每一個key對應的元素個數。

建立一個PairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
           

(2)統計每種key的個數

scala> rdd.countByKey

res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
           

13.foreach(function)

     作用:在資料集的每一個元素上,運作函數func進行更新。

建立一個RDD

scala> var rdd = sc.makeRDD(1 to 5,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
           

(2)對該RDD每個元素進行列印

scala> rdd.foreach(println(_))

3

4

5

1

2

           

五、RDD中的函數傳遞

       在實際開發中我們往往需要自己定義一些對于RDD的操作,那麼此時需要主要的是,初始化工作是在Driver端進行的,而實際運作程式是在Executor端進行的,這就涉及到了跨程序通信,是需要序列化的。

1.建立一個類

class Search(s:String){
  //過濾出包含字元串的資料
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  //過濾出包含字元串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }
  //過濾出包含字元串的RDD
  def getMatche2(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }

}
           

2.建立Spark主程式

object SeriTest {
  def main(args: Array[String]): Unit = {
    //1.初始化配置資訊及SparkContext
    val sparkConf: SparkConf = new 
    SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    //2.建立一個RDD
    val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
    //3.建立一個Search對象
    val search = new Search()
    //4.運用第一個過濾函數并列印結果
    val match1: RDD[String] = search.getMatche1(rdd)
    match1.collect().foreach(println)
    }
}
           

3.運作程式

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)

    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

    at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

    at org.apache.spark.rdd.RDD.filter(RDD.scala:386)

    at com.atguigu.Search.getMatche1(SeriTest.scala:39)

    at com.atguigu.SeriTest$.main(SeriTest.scala:18)

    at com.atguigu.SeriTest.main(SeriTest.scala)

Caused by: java.io.NotSerializableException: com.atguigu.Search
           

4.問題說明

//過濾出包含字元串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }
           

        在這個方法中所調用的方法isMatch()是定義在Search這個類中的,實際上調用的是this. isMatch(),this表示Search這個類的對象,程式在運作過程中需要将Search對象序列化以後傳遞到Executor端。

5. 解決方案

使類繼承scala.Serializable即可。

class Search() extends Serializable{...}
           

六、RDD的依賴關系

1.Lineage

     RDD隻支援粗粒度轉換,即在大量記錄上執行的單個操作。将建立RDD的一系列Lineage(血統)記錄下來,以便恢複丢失的分區。RDD的Lineage會記錄RDD的中繼資料資訊和轉換行為,當該RDD的部分分區資料丢失時,它可以根據這些資訊來重新運算和恢複丢失的資料分區。

SparkCore-RDD程式設計

(1)讀取一個HDFS檔案并将其中内容映射成一個個元組

scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))

wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24
           

(2)統計每一種key對應的個數

scala> val wordAndCount = wordAndOne.reduceByKey(_+_)

wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26
           

(3)檢視“wordAndOne”的Lineage

scala> wordAndOne.toDebugString

res5: String =
(2) MapPartitionsRDD[22] at map at <console>:24 []

 |  MapPartitionsRDD[21] at flatMap at <console>:24 []

 |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

 |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
           

(4)檢視“wordAndCount”的Lineage

scala> wordAndCount.toDebugString

res6: String =
(2) ShuffledRDD[23] at reduceByKey at <console>:26 []

 +-(2) MapPartitionsRDD[22] at map at <console>:24 []

    |  MapPartitionsRDD[21] at flatMap at <console>:24 []

    |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

    |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
           

(5)檢視“wordAndOne”的依賴類型

scala> wordAndOne.dependencies

res7: Seq[org.apache.spark.Dependency[_]] = List([email protected])
           

(6)檢視“wordAndCount”的依賴類型

scala> wordAndCount.dependencies

res8: Seq[org.apache.spark.Dependency[_]] = List([email protected])
           

注意:RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

2.窄依賴

   窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴我們形象的比喻為獨生子女

SparkCore-RDD程式設計

3.寬依賴

   寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle,總結:寬依賴我們形象的比喻為超生

SparkCore-RDD程式設計

4.DAG

     DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同将DAG劃分成不同的Stage,對于窄依賴,partition的轉換處理在Stage中完成計算。對于寬依賴,由于有Shuffle的存在,隻能在parent RDD處理完成後,才能開始接下來的計算,是以寬依賴是劃分Stage的依據。

SparkCore-RDD程式設計

5.任務劃分

RDD任務切分中間分為:Application、Job、Stage和Task

1)Application:初始化一個SparkContext即生成一個Application

2)Job:一個Action算子就會生成一個Job

3)Stage:根據RDD之間的依賴關系的不同将Job劃分成不同的Stage,遇到一個寬依賴則劃分一個Stage。

SparkCore-RDD程式設計

4)Task:Stage是一個TaskSet,将Stage劃分的結果發送到不同的Executor執行即為一個Task。

注意:Application->Job->Stage-> Task每一層都是1對n的關系。

6.RDD緩存

(1)DD通過persist方法或cache方法可以将前面的計算結果緩存,預設情況下 persist() 會把資料以序列化的形式緩存在 JVM 的堆空間中。

(2)但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。

SparkCore-RDD程式設計

(3)通過檢視源碼發現cache最終也是調用了persist方法,預設的存儲級别都是僅在記憶體存儲一份,Spark的存儲級别還有好多種,存儲級别在object StorageLevel中定義的。

SparkCore-RDD程式設計

在存儲級别的末尾加上“_2”來把持久化資料存為兩份

SparkCore-RDD程式設計

       緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于RDD的一系列轉換,丢失的資料會被重算,由于RDD的各個Partition是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部Partition。

7. RDD CheckPoint(檢查點)

       Spark中對于資料的儲存除了持久化操作之外,還提供了一種檢查點的機制,檢查點(本質是通過将RDD寫入Disk做檢查點)是為了通過lineage做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丢失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。檢查點通過将資料寫入到HDFS檔案系統實作了RDD的檢查點功能。

      為目前RDD設定檢查點。該函數将會建立一個二進制的檔案,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設定的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的資訊将全部被移除。對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。

(1)設定檢查點

scala> sc.setCheckpointDir("hdfs://hdp-1:9000/checkpoint")
           

(2)建立一個RDD

scala> val rdd = sc.parallelize(Array("hahah"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
           

(3)将RDD轉換為攜帶目前時間戳并做checkpoint

scala> val ch = rdd.map(_+System.currentTimeMillis)

ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
           

(4)多次列印結果

scala> ch.collect

res55: Array[String] = Array(atguigu1538981860336)

scala> ch.collect

res56: Array[String] = Array(atguigu1538981860504)



scala> ch.collect

res57: Array[String] = Array(atguigu1538981860504)



scala> ch.collect

res58: Array[String] = Array(atguigu1538981860504)
           

繼續閱讀