1、算子的簡單介紹
Transformation(轉換)算子:根據資料集建立一個新的資料集,計算後傳回一個新RDD,例如一個rdd進行map操作後生了一個新的rdd。
Action(動作)算子:對rdd結果計算後傳回一個數值value給驅動程式(driver),例如collect算子将資料集的所有元素收集完成傳回給驅動程式。
控制算子:對資料集進行特殊操作,例如cache算子将對于重複使用的算子,進行cache做緩存使用,資料隻儲存在記憶體中,性能提升。
懶執行:Spark中轉化算子和控制算子是懶執行的,需要Action算子觸發才能執行。
懶執行就是延遲計算的意思,就像是建立了一個視圖,他并不是把查詢好的資料放入視圖了,而是當你需要這些資料時,檢視視圖時,他才執行定義視圖時候的SQL語句。
注意:
Driver即運作Application的main()函數并且建立SparkContext。
Application使用者編寫的Spark應用程式。
SparkContext整個應用的上下文、控制應用的生命周期。
job即在每一個application中,有幾個action,就會産生幾個job。
2、算子的使用
2.1、常用的Transformation算子
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)),4)
//map(函數),一一映射,分區數量不變,有多少條資料,就被會運作多少次。
val value1: RDD[(Int, Int)] = pairRdd.map(x => (x._1, x._2 + 1))
//定義一個疊代函數
val f1 = (iter:Iterator[(Int, Int)]) => iter.map(x=>(x._1,x._2*2))
//mapPartitions函數可以認為是Map的變種,可以對分區進行并行處理,兩者的差別是調用的顆粒度不一樣,map的輸入函數是應用于RDD的每個元素,而mapPartition的輸入函數是應用于RDD的每個分區。
val value2: RDD[(Int, Int)] = pairRdd.mapPartitions(f1)
//定義一個疊代函數
val f2 = (index:Int,iter:Iterator[(Int, Int)]) => iter.map((index,_))
//mapPartitionsWithIndex函數類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,是以在類型為T的RDD上運作時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
val value3: RDD[(Int, (Int, Int))] = pairRdd.mapPartitionsWithIndex(f2)
//filter函數,傳回一個新的RDD,該RDD由經過func函數計算後傳回值為true的輸入元素組成,fitler并不會改變分區的數量,之前有幾個,現在仍然有幾個分區。
val value4: RDD[(Int, Int)] = pairRdd.filter(x => x._2 % x._1 == 0)
//flatMap函數: map之後,再flatten。每一個輸入元素可以被映射為0或多個輸出元素(是以func應該傳回一個序列,而不是單一進制素)
val value5: RDD[Char] = pairRdd.flatMap(x => x._1.toString + x._2.toString)
//groupByKey函數:在一個(K,V)的RDD上調用,傳回一個(K, Iterator[V])的RDD,可重新指定分區個數
val value6: RDD[(Int, Iterable[Int])] = pairRdd.groupByKey(1)
//groupBy函數與groupByKey類似,底層調用groupByKey,傳回一個(K, Iterator[V])的RDD,可重新指定分區個數
val value7: RDD[(Int, Iterable[(Int, Int)])] = pairRdd.groupBy(_._1, 1)
//groupBy後要搭配MapValues()使用
//reduceByKey函數:在一個(K,V)的RDD上調用,傳回一個(K,V)的RDD,使用指定的reduce函數,将相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設定
val value8: RDD[(Int, Int)] = pairRdd.reduceByKey(_ + _, 1)
//aggregateByKey 聚合類算子:初始化在每一個分區聚合中參數運作,但是在全局聚合中,不參與,類似于reduceByKey,但多個初始值
val value11: RDD[(Int, Int)] = pairRdd.aggregateByKey(100)(_ + _, _ + _)
//sortByKey函數:在一個(K,V)的RDD上調用,K必須實作Ordered接口,傳回一個按照key進行排序的(K,V)的RDD,是全局排序,可指定分區數量
//可設定排序規則,預設是從小到大排序,是true, 如果想從大到小排,設定為false
val value12: RDD[(Int, Int)] = pairRdd.sortByKey(false,2)
//sortBy函數:與sortByKey類似,但是更靈活,可設定排序的字段
val value13: RDD[(Int, Int)] = pairRdd.sortBy(_._1, false, 2)
//union函數:對源RDD和參數RDD求并集後傳回一個新的RDD
val value14: RDD[(Int, Int)] = value1.union(value2)
//intersection函數:對源RDD和參數RDD求交集後傳回一個新的RDD
val value15: RDD[(Int, Int)] = value1.intersection(value2)
//subtract函數:對源RDD和參數RDD求差集後傳回一個新的RDD
val value16: RDD[(Int, Int)] = value1.subtract(value2)
//join函數:對源RDD和參數RDD進行join傳回一個新的RDD
val value17: RDD[(Int, (Int, Int))] = value1.join(value2)
//cogroup函數:對源RDD和參數RDD進行全外關聯傳回一個新的RDD,相當于SQL中的全外關聯full outer join,傳回左右RDD中的記錄,關聯不上的為空。
val value18: RDD[(Int, (Iterable[Int], Iterable[Int]))] = value1.cogroup(value2)
//cartesian函數:對源RDD和參數RDD進行笛卡爾積傳回一個新的RDD
val value19: RDD[((Int, Int), (Int, Int))] = value1.cartesian(value2)
//coalesce:對RDD重新進行分區
val value20: RDD[((Int, Int), (Int, Int))] = value19.coalesce(10)
//repartition:類似于coalesce方法,底層就是用的coalesce方法,對RDD重新進行分區
val value21: RDD[((Int, Int), (Int, Int))] = value20.repartition(5)
//去重:distinct(分區數量)。
val value22: RDD[((Int, Int), (Int, Int))] = value19.distinct(12)
//等價于
val value23: RDD[((Int, Int), (Int, Int))] =value19.map(x=>(x,null)).reduceByKey((x,y) => x).map(_._1)
2.2、常用的ACtion算子
//reduce函數與reduceByKey類似,傳回一個結果的RDD,+ ,++ 取決rdd的元素類型,string類型使用++
val value9: Int = pairRdd.map(_._2).reduce(_ + _)
//aggregate 聚合類算子:存在兩次聚合。局部聚合和全局聚合。
//先局部計算(100+分區一的内容) (100+分區二的内容)+...
//再全局計算100+(100+分區一的内容)+ (100+分區二的内容)+...
val value10: Int = pairRdd.map(_._2).aggregate(100)(_ + _, _ + _)
//collect 在驅動程式中,以數組的形式傳回資料集的所有元素
val tuples: Array[((Int, Int), (Int, Int))] = value21.collect()
//foreach 在資料集的每一個元素上,運作函數func
val unit: Unit = value11.foreach(println(_))
//foreach 疊代的是每一個分區的資料
//如果我們需要去擷取mysql的連接配接,RDD中有10000 條資料,有10個分區。
//foreach:擷取10000次連接配接,性能低下。
//foreapartition: 隻需要擷取10次連接配接,每一個分區中的資料,共用一個連接配接。 性能優越。
value21.foreachPartition(println(_))
//saveAsTextFile 将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本
value21.saveAsTextFile("路徑")
//count() 傳回RDD的元素個數
val tuples1: Array[((Int, Int), (Int, Int))] = value19.collect()
//first() 傳回RDD的第一個元素(類似于take(1))
val tuple: ((Int, Int), (Int, Int)) = value19.first()
//take(n) 傳回一個由資料集的前n個元素組成的數組
val tuples2: Array[((Int, Int), (Int, Int))] = value19.take(10)
注意:
sortBy 是按照 RangePartitioner作為分區器
groupByKey 和 reduceByKey 中優先使用reduceByKey,因為reduceByKey會有一個局部的聚合,性能更好。
ByKey的都是轉換算子,沒有Key的都是action類算子。aggregateByKey和reduceByKey是轉化算子,aggregate和reduce是執行算子
2.3、控制算子
cache | 對于重複使用的算子,進行cache做緩存使用,資料隻儲存在記憶體中,性能提升 |
---|---|
persist | 性能提升 |
checkPoint | 資料容錯,當資料計算的時候,機器挂了,重新追溯到checkPoint的目錄下checkPoint是将RDD持久化到磁盤中,還可以切斷RDD之間的依賴關系 |