一、程式設計模型
在Spark中,RDD被表示為對象,通過對象上的方法調用來對RDD進行轉換。經過一系列的transformations定義RDD之後,就可以調用actions觸發RDD的計算,action可以是向應用程式傳回結果(count, collect等),或者是向存儲系統儲存資料(saveAsTextFile等)。在Spark中,隻有遇到action,才會執行RDD的計算(即延遲計算),這樣在運作時可以通過管道的方式傳輸多個轉換。
要使用Spark,開發者需要編寫一個Driver程式,它被送出到叢集以排程運作Worker,如下圖所示。Driver中定義了一個或多個RDD,并調用RDD上的action,Worker則執行RDD分區計算任務。

二、RDD的建立
在Spark中建立RDD的建立方式可以分為三種:從集合中建立RDD;從外部存儲建立RDD;從其他RDD建立。
1.從集合中建立
從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD
(1)使用parallelize()從集合建立
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)使用makeRDD()從集合建立
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
2.由外部存儲系統的資料集建立
包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
3.從其他RDD建立
算子的轉換形成RDD
三、RDD的轉換
RDD整體上分為Value類型和Key-Value類型
詳情請見:https://blog.csdn.net/weixin_43233971/article/details/103076286
四、Action(行動算子)
1.reduce(function)
作用:通過func函數聚集RDD中的所有元素,先聚合分區内資料,再聚合分區間資料。
建立一個RDD[Int]
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
聚合RDD[Int]所有元素
scala> rdd1.reduce(_+_)
res50: Int = 55
建立一個RDD[String]
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
聚合RDD[String]所有資料
scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)
2.collect()
作用:在驅動程式中,以數組的形式傳回資料集的所有元素。
建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)将結果收集到Driver端
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
3.count()
作用:傳回RDD中元素的個數
建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
統計該RDD的條數
scala> rdd.count
res1: Long = 10
4.first()
作用:傳回RDD中的第一個元素
建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
統計該RDD的條數
scala> rdd.first
res2: Int = 1
5.take(n)
作用:傳回一個由RDD的前n個元素組成的數組
建立一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
統計該RDD的條數
scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)
6.takeOrdered(n)
作用:傳回該RDD排序後的前n個元素組成的數組
建立一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.takeOrdered(3)
res18: Array[Int] = Array(2, 3, 4)
7.aggregate
參數:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
作用:aggregate函數将每個分區裡面的元素通過seqOp和初始值進行聚合,然後用combine函數将每個分區的結果和初始值(zeroValue) 進行combine操作。這個函數最終傳回的類型不需要和RDD中元素類型一緻。
建立一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)将該RDD所有元素相加得到結果
scala> rdd.aggregate(0)(_+_,_+_)
res22: Int = 55
8.flod(num)(function)
作用:折疊操作,aggregate的簡化操作,seqop和combop一樣。
建立一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)将該RDD所有元素相加得到結果
scala> rdd.fold(0)(_+_)
res24: Int = 55
9.saveAsTextFile(path)
作用:将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本
10.saveAsSequenceFile(path)
作用:将資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
11.saveAsObjectFile(path)
作用:用于将RDD中的元素序列化成對象,存儲到檔案中。
12.countByKey()
作用:針對(K,V)類型的RDD,傳回一個(K,Int)的map,表示每一個key對應的元素個數。
建立一個PairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
(2)統計每種key的個數
scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
13.foreach(function)
作用:在資料集的每一個元素上,運作函數func進行更新。
建立一個RDD
scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
(2)對該RDD每個元素進行列印
scala> rdd.foreach(println(_))
3
4
5
1
2
五、RDD中的函數傳遞
在實際開發中我們往往需要自己定義一些對于RDD的操作,那麼此時需要主要的是,初始化工作是在Driver端進行的,而實際運作程式是在Executor端進行的,這就涉及到了跨程序通信,是需要序列化的。
1.建立一個類
class Search(s:String){
//過濾出包含字元串的資料
def isMatch(s: String): Boolean = {
s.contains(query)
}
//過濾出包含字元串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
//過濾出包含字元串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
2.建立Spark主程式
object SeriTest {
def main(args: Array[String]): Unit = {
//1.初始化配置資訊及SparkContext
val sparkConf: SparkConf = new
SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.建立一個RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.建立一個Search對象
val search = new Search()
//4.運用第一個過濾函數并列印結果
val match1: RDD[String] = search.getMatche1(rdd)
match1.collect().foreach(println)
}
}
3.運作程式
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
4.問題說明
//過濾出包含字元串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
在這個方法中所調用的方法isMatch()是定義在Search這個類中的,實際上調用的是this. isMatch(),this表示Search這個類的對象,程式在運作過程中需要将Search對象序列化以後傳遞到Executor端。
5. 解決方案
使類繼承scala.Serializable即可。
class Search() extends Serializable{...}
六、RDD的依賴關系
1.Lineage
RDD隻支援粗粒度轉換,即在大量記錄上執行的單個操作。将建立RDD的一系列Lineage(血統)記錄下來,以便恢複丢失的分區。RDD的Lineage會記錄RDD的中繼資料資訊和轉換行為,當該RDD的部分分區資料丢失時,它可以根據這些資訊來重新運算和恢複丢失的資料分區。
(1)讀取一個HDFS檔案并将其中内容映射成一個個元組
scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))
wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24
(2)統計每一種key對應的個數
scala> val wordAndCount = wordAndOne.reduceByKey(_+_)
wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26
(3)檢視“wordAndOne”的Lineage
scala> wordAndOne.toDebugString
res5: String =
(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(4)檢視“wordAndCount”的Lineage
scala> wordAndCount.toDebugString
res6: String =
(2) ShuffledRDD[23] at reduceByKey at <console>:26 []
+-(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(5)檢視“wordAndOne”的依賴類型
scala> wordAndOne.dependencies
res7: Seq[org.apache.spark.Dependency[_]] = List([email protected])
(6)檢視“wordAndCount”的依賴類型
scala> wordAndCount.dependencies
res8: Seq[org.apache.spark.Dependency[_]] = List([email protected])
注意:RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
2.窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴我們形象的比喻為獨生子女
3.寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle,總結:寬依賴我們形象的比喻為超生
4.DAG
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同将DAG劃分成不同的Stage,對于窄依賴,partition的轉換處理在Stage中完成計算。對于寬依賴,由于有Shuffle的存在,隻能在parent RDD處理完成後,才能開始接下來的計算,是以寬依賴是劃分Stage的依據。
5.任務劃分
RDD任務切分中間分為:Application、Job、Stage和Task
1)Application:初始化一個SparkContext即生成一個Application
2)Job:一個Action算子就會生成一個Job
3)Stage:根據RDD之間的依賴關系的不同将Job劃分成不同的Stage,遇到一個寬依賴則劃分一個Stage。
4)Task:Stage是一個TaskSet,将Stage劃分的結果發送到不同的Executor執行即為一個Task。
注意:Application->Job->Stage-> Task每一層都是1對n的關系。
6.RDD緩存
(1)DD通過persist方法或cache方法可以将前面的計算結果緩存,預設情況下 persist() 會把資料以序列化的形式緩存在 JVM 的堆空間中。
(2)但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。
(3)通過檢視源碼發現cache最終也是調用了persist方法,預設的存儲級别都是僅在記憶體存儲一份,Spark的存儲級别還有好多種,存儲級别在object StorageLevel中定義的。
在存儲級别的末尾加上“_2”來把持久化資料存為兩份
緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于RDD的一系列轉換,丢失的資料會被重算,由于RDD的各個Partition是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部Partition。
7. RDD CheckPoint(檢查點)
Spark中對于資料的儲存除了持久化操作之外,還提供了一種檢查點的機制,檢查點(本質是通過将RDD寫入Disk做檢查點)是為了通過lineage做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丢失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。檢查點通過将資料寫入到HDFS檔案系統實作了RDD的檢查點功能。
為目前RDD設定檢查點。該函數将會建立一個二進制的檔案,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設定的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的資訊将全部被移除。對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。
(1)設定檢查點
scala> sc.setCheckpointDir("hdfs://hdp-1:9000/checkpoint")
(2)建立一個RDD
scala> val rdd = sc.parallelize(Array("hahah"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
(3)将RDD轉換為攜帶目前時間戳并做checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
(4)多次列印結果
scala> ch.collect
res55: Array[String] = Array(atguigu1538981860336)
scala> ch.collect
res56: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)