天天看點

Spark常見算子總結Transformation:Action:

官方文檔上列舉共有32種常見算子,包括Transformation的20種操作和Action的12種操作。

内容自己用心整理,看我的就夠了

Transformation:

1.map

map的輸入變換函數應用于RDD中所有元素,而mapPartitions應用于所有分區。差別于mapPartitions主要在于調用粒度不同。如parallelize(1 to 10, 3),map函數執行10次,而mapPartitions函數執行3次。

object TestTransformation {
  def main(args: Array[String]): Unit = { 
    //0.先建立SparkContext對象
    val conf = new SparkConf().setAppName("TestTransformation").setMaster("local")
    val sc = new SparkContext(conf)
    //1.建立RDD
    val rdd: RDD[Int] = sc.parallelize(List(5,6,4,7,3,9,2,10,8))
    //1.周遊RDD中每一個元素并進行操作 參數是一個處理RDD中資料的函數
    val rdd2: RDD[Int] = rdd.map(_ * 2)
    //行動算子 collect 将RDD中資料轉換成一個集合進行存儲   
    println(rdd2.collect().toBuffer)
           

2.filter

參數(function)

過濾操作,滿足filter内function函數為true的RDD内所有元素組成一個新的資料集。如:filter(a == 1)。

3.flatMap

參數(function)

map是對RDD中元素逐一進行函數操作映射為另外一個RDD,而flatMap操作是将函數應用于RDD之中的每一個元素,将傳回的疊代器的所有内容構成新的RDD。

flatMap與map差別在于map為“映射”,而flatMap“先映射,後扁平化”,map對每一次(func)都産生一個元素,傳回一個對象,而flatMap多一步就是将所有對象合并為一個對象。

//3.周遊RDD中每一個元素并将元素進行扁平化處理
val rdd4 = sc.parallelize(Array("a b c","b c d"))
//得到集合中集合的效果 而非集合中存儲資料
//val rdd5: RDD[Array[String]] = rdd4.map(_.split(" ")) 
val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
println(rdd5.collect().toBuffer)
           

4.mapPartitions

參數(function)

區于foreachPartition(屬于Action,且無傳回值),而mapPartitions可擷取傳回值。與map的差別前面已經提到過了,但由于單獨運作于RDD的每個分區上(block),是以在一個類型為T的RDD上運作時,(function)必須是Iterator => Iterator類型的方法(入參)。

//周遊出集合中每一個元素并進行操作 //建立了一個rdd1,資料作用在3個分區上
val rdd1 =sc.parallelize(List(1,2,3,4,5,6),3)
/*
mapPartitions是對每一個分區中資料進行疊代(資料處理 操作)
f: Iterator[T] => Iterator[U] 第一個參數疊代器是對象[分區對象] preservesPartitioning: Boolean = false 預設值 是否保留父RDD的分區資訊(此值不傳
入)
mapPartitions 和 map算子 如果是一個分區周遊的效果是沒有什麼差別的
但是建議在實際開發中使用mapPartitions算子代替Map,可以加載資料的處理 
ps:如果RDD中資料量過大比如10億條,此時堅決不要使用mapPartitions進行資料周遊,可能會
出現OOM記憶體溢出 */
rdd1.mapPartitions(_.map(_*10))
//解釋:第一個下滑線代表的是對應的分區即rdd有幾個分區,下劃線就會被執行多少次
//例如: 分區是3 --> 資料可能會被分為 1,2 一個分區 3,4 一個分區 5,6 一個分區 此時_ 代表的就是每一個分區對象
// 第二個下劃線代表的是每一個分區内部的具體資料
//例如: 第一個下劃線觸發了第一個分區 得到資料是 1,2 第二個下劃線就有相當于是 1*10 2*10
           

5.mapPartitionsWithIndex

參數(function)

與mapPartitions類似,但需要提供一個表示分區索引值的整型值作為參數,是以function必須是(int, Iterator)=>Iterator類型的。

//mapPartitionsWithIndex 是對rdd中每個分區的周遊操作
//本質操作和mapPartitions類似,隻不過,可以更加具體的是對資料進行輸出 
/**
* f: (Int, Iterator[T]) => Iterator[U] 第一個參數是分區的疊代方式,疊代方式有兩 個值Int是分區值(第幾分區) Iterator代表的是分區内部具體的值
值不傳
* preservesPartitioning: Boolean = false 是否保留父RDD的Partition資訊 預設 */
val Iter = (index:Int,iter:Iterator[Int])=>{
     iter.map(x => {"[partID"+index+",value"+x+"]"})
}
val rdd3: RDD[String] = rdd1.mapPartitionsWithIndex(Iter)
 println(rdd3.collect.toList)
           

6.sample

參數(withReplacement, fraction, seed)

采樣操作,用于從樣本中取出部分資料。withReplacement是否放回,fraction采樣比例,seed用于指定的随機數生成器的種子。(是否傳回抽樣分true和false,fraction取樣比例為(0, 1]。seed種子為整型實數。)

//4.sample随機抽樣
	val rdd5_1 = sc.parallelize(1 to 10)
/**
* withReplacement: Boolean第一參數抽出資料是否傳回 true為傳回 false不傳回
* fraction: Double 第二參數 抽樣的比例 30% 傳入0.3即可,但是這個是浮動值 不一定準确
* seed: Long = Utils.random.nextLong 産生一個随機的種子 */
	val sample: RDD[Int] = rdd5_1.sample(false,0.5)
	println(sample.collect.toBuffer)
           

7.union

參數(otherDataSet)

對于源資料集和其他資料集求并集,不去重。

8.intersection

參數(otherDataSet)

對于源資料集和其他資料集求交集,并去重,且無序傳回。

9.distinct

參數([numTasks])

這個numTasks并不跟分區有關系,而可以了解為一個數學概念中的“因子”。如果設定的numTasks能被資料集中元素整除,那麼排序就按先無序的排因子,後無序排非因子的組合(即相當于局部無序);如果設定的numTasks不能被資料集中所有元素整除,那麼排序會按照去重之前RDD排序的順序傳回。

注:之後groupByKey、reduceByKey、aggregateByKey、sortByKey、join、cogroup等Transformation操作均包含[numTasks]任務數這個參數,參考上一行連結了解。

傳回一個在源資料集去重之後的新資料集,即去重,并局部無序而整體有序傳回。

//5.union求并集 intersection 交集
val rdd6 = sc.parallelize(List(5,6,7,8))
val rdd7 = sc.parallelize(List(1,2,5,6))
println( (rdd6 union rdd7).collect.toBuffer)
println( (rdd6 intersection rdd7).collect.toBuffer)
//6.去除重複 distinct
val rdd9 = sc.parallelize(List(1,2,2,3,4,5,2,6,9))
println(rdd9.distinct.collect.toBuffer)
           

10.groupByKey

([numTasks])

在一個PairRDD或(k,v)RDD上調用,傳回一個(k,Iterable)。主要作用是将相同的所有的鍵值對分組到一個集合序列當中,其順序是不确定的。groupByKey是把所有的鍵值對集合都加載到記憶體中存儲計算,若一個鍵對應值太多,則易導緻記憶體溢出。

//分組
//第一種 是根據傳入指定的參數進行分組
val rdd11_4_1: RDD[((String, Int), Iterable[((String, Int), (String,
Int))])] = rdd11_3.groupBy(_._1)
//第二種 根據key進行分組(需要是一個對偶元組),根據key進行分組
val rdd11_4_2: RDD[((String, Int), Iterable[(String, Int)])] =
rdd11_3.groupByKey()
           

11.reduceByKey

(function,[numTasks])

與groupByKey類似,卻有不同。如(a,1), (a,2), (b,1), (b,2)。groupByKey産生中間結果為( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey為(a,3), (b,3)。

reduceByKey主要作用是聚合,groupByKey主要作用是分組。(function對于key值來進行聚合)

//求和
//除了XXXBykey之外,還有一個求和的算子,不此算子是action算子 reduce 
//1.reduceBykey将相同key 為一組進行聚合,得到最終結果集
val rdd6 = sc.parallelize(Array(("tom",1),("jerry",3),("kitty",2),
("jerry",2)))
val sumed: RDD[(String, Int)] = rdd6.reduceByKey(_+_)
/**
觸發原則: reduceByKey會尋找相同的key,然後進行相加求和計算 第一次 第一個下劃線代表對應key中value的值
第二個下劃線代表對應key中value的值
然後求和
後續求和 : 第一個下劃線會擷取上一次計算的結果
第二個下劃線會擷取key中對應value的值 直到RDD沒有相同key時,此時reduceBykey結束一次計算 */

           

12.aggregateByKey

(zeroValue)(seqOp, combOp, [numTasks])

( zeroValue: U )( seqOp: (U, V) => U , combOp: (U, U) => U )

zeroValue預設參數值即初始值 會參與到分區中計算

seqOp 局部聚合(求分區内部的資料和)

combOp 全局聚合(求所有分區資料聚合後的求和)

類似reduceByKey,對pairRDD中相同的key值進行聚合操作

在kv對中RDD按照key将value進行分組合并,合并的時将會每個value和初始值作為seqOp函數參數

進行使用,進行計算的同時,會傳回一個結果 作為一個新的kv對存在,然後将結果再按照key進行合并,最後,将每個分區中value傳遞給combOp函數在此進行計算(現将前兩value值進行計算,然後傳回 結果和下一個value再進行計算 以此類推)

val rdd8 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),
                               ("dog",4),("cat",4)),2)
def func[T](index:Int,iter: Iterator[(T)])={
  iter.map(x=>{"[partID:"+index+" value:"+x})
}//寫一個函數将分區中資料周遊出來
println(rdd8.mapPartitionsWithIndex(func).collect.toList)
val value: RDD[(String, Int)] = rdd8.aggregateByKey(0)(math.max(_,_),_+_)
println(value.collect().toBuffer)// ArrayBuffer((dog,4), (pig,10), (cat,9))
分析:
//partID:0 value:(cat,2), partID:0 value:(cat,5), partID:0 value:(pig,10) -->分區一
// --> aggregateByKey分區計算 得到的結果是( (cat ,5),(pig,10))
// partID:1 value:(dog,3), partID:1 value:(dog,4), partID:1 value:(cat,4)) --> 分區二
// --> aggregateByKey分區計算 得到的結果是( (dog ,4),(cat,4))
//全局聚合會将所有分區中得到的結果看做是一個分區即 ((cat ,5),(pig,10),(dog ,4), (cat,4))
// 相同key為一組 觸發一次計算 (cat ,9) (pig,10) ,(dog ,4)

           

12_1.combineByKey

/**
* 求和
* combineByKey 相同key為一組,把value合并計算
* createCombiner: V => C 會周遊分區中所有的元素,是以每個元素的key要麼是遇到(等 同于是相同key)要麼就是沒有遇到過的(等同于是不同的key)
* 如果這個元素是一個新的元素,使用createCombiner()函數 對這個可以進行第一次的初始累加
* mergeValue: (C, V) => C 如果是第一次處理目前分區中遇到的key,它會使用 mergeVlaue進行對該值的合并
mergeCombiners: (C, C) => 由于每個分區都是獨立處理,因為對于同一個key可以進行多個累加
ps:使用方式類似aggregateByKey,但是第一個參數是周遊RDD中每元素,剩餘的求和位置是相同的
combineByKey不允許使用 _ 代表 參數
*/
val rdd9 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),
("dog",4),("cat",4)),2)
val rdd9_1 = rdd9.combineByKey(x=>x,(a:Int,b:Int)=>a+b,
(m:Int,n:Int)=>m+n)
      println(rdd9_1.collect.toList)


           

13.sortByKey

([ascending], [numTasks])

排序算子,除了spark中内置排序算子之外,可以先将RDD通過collect算子轉換為Scala版本的集合,然後 再進行排序,再對資料轉換RDD

同樣是基于pairRDD的,根據key值來進行排序。ascending升序,預設為true,即升序;numTasks

//1.sortByKey 根據key進行排序,但是key必須具備可比較性,必須實作Ordered特質
val rdd4 = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
//說明: 是有一個參數的, 這個參數可以進行排序是升序還是降序,預設是true即升序, 降序false val sorted: RDD[(Int, String)] = rdd4.sortByKey()
       println(sorted.collect.toList)

//2.sortBy和Scala中sortBy是不一樣的 
//spark中sortBy是可以自定義升降序排序, scala中是不可以的
//sortBy比sortBykey更加靈活,sortBy可以根據第一個參數決定誰進行排序 val rdd5 = sc.parallelize(List(1,5,2,6,3,6,32,345,723,34))
      val sorted2: RDD[Int] = rdd5.sortBy(x=>x,false)
      println(sorted2.collect.toList)

           

14.join

(otherDataSet,[numTasks])

加入一個RDD,在一個(k,v)和(k,w)類型的dataSet上調用,傳回一個(k,(v,w))的pair dataSet。

//join 相同的key會合并, 相當于相同key為一組 value進行聚和(聚和不是求和,而是将值放到 一起)
    val rdd10_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
    val rdd10_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
    val rdd10_3 = rdd10_1 join rdd10_2
    println(rdd10_3.collect.toBuffer)//ArrayBuffer((tom,(1,2)), (jerry,(3,2)))舍棄不同key的情況

           

14_1.leftOuterJoin、rightOuterJoin

//左連接配接和右連接配接 除了基準值之外 都是Option類型,因為值可能存在空值即null 
//左連接配接是以左邊為基準值 右邊沒有的為null
rdd10_1 leftOuterJoin rdd10_2
//右連接配接是以右邊為基準值,左變沒有的為null
rdd10_1 rightOuterJoin rdd10_2

           

15.cogroup

(otherDataSet,[numTasks])

合并兩個RDD,生成一個新的RDD。執行個體中包含兩個Iterable值,第一個表示RDD1中相同值,第二個表示RDD2中相同值(key值),這個操作需要通過partitioner進行重新分區,是以需要執行一次shuffle操作。(若兩個RDD在此之前進行過shuffle,則不需要)

//這種方式和groupByKey類似,根據相同key為一組進行分組
//不同點,groupBykey必須是對偶元組,若資料沒有形成對偶元組,必須進行合并 
//cogroup對資料并不需要合并的前提下就可以根據相同key進行分組
//ps:cogroup本質還是進行了合并
val rdd11_4_3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1
cogroup rdd11_2
println(rdd11_4_3.collect().toBuffer)
//ArrayBuffer((tom,(CompactBuffer(1),CompactBuffer(2))), (dog,(CompactBuffer(),CompactBuffer(10))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))

           

16.cartesian

(otherDataSet)

求笛卡爾乘積。該操作不會執行shuffle操作。

//産生笛卡爾積cartesian
val rdd11_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2))) 
val rdd11_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
val rdd11_3 = rdd11_1 cartesian rdd11_2
     println(rdd11_3.collect.toBuffer)

           

17.pipe

(command,[envVars])

通過一個shell指令來對RDD各分區進行“管道化”。通過pipe變換将一些shell指令用于Spark中生成的新RDD,如:

val rdd: RDD[Int] = sc.parallelize(0 to 7 , 4)
val rdd1: RDD[Array[Int]] = rdd.glom()
val rdd2: RDD[String] = rdd.pipe("head -n 1")

println(rdd1.collect().toBuffer)//ArrayBuffer([[email protected], [[email protected], [[email protected], [[email protected])
//内部是數組:ArrayBuffer(Array(0,1),Array(2,3),Array(4,5),Array(6,7))
println(rdd2.collect().toBuffer)//ArrayBuffer(0, 2, 4, 6)
//截取每個分區中第一個元素作為新的RDD

           

18.coalesce

(numPartitions)

重新分區,**減少**RDD中分區的數量到numPartitions。

/**
* coalesce 更改分區
* 不能将小分區修改為大分區,因為不會發生shuffle
* 例如 原始分區是3 新分區是5 結果不會将資料存儲到新分區中 而是使用原始分區 --> 不
會發生shuffle *縮減分區,将原有分區進行縮減
*/
val reps2 = rdd6.coalesce(5) 
println("通過coalesce調整分區後的分區數量:"+reps1.partitions.length) println(reps2.mapPartitionsWithIndex(Iter).collect().toList)

           

19.repartition

(numPartitions)

repartition是coalesce接口中shuffle為true的簡易實作,即Reshuffle RDD并随機分區,使各分區資料量盡可能平衡。若分區之後分區數遠大于原分區數,則需要shuffle。

//第二參數可以指定分區值 --> 值是幾 分區就會分幾個
val  rdd6= sc.parallelize(List(1,2,3,4,5,6),3)
println("初始化分區:"+rdd6.partitions.length)
/**
*若使用repartition分區需要注意 如果從少量分區改變為多分區此時會發生shuffle * 例如 原有分區數量是3 修改後分區數量是5 -> 重新發生shuffle
*/
println(rdd6.mapPartitionsWithIndex(Iter).collect().toList)
val reps1: RDD[Int] = rdd6.repartition(5) 
println("通過repartition調整分區後的分區數量:"+reps1.partitions.length)
println(reps1.mapPartitionsWithIndex(Iter).collect().toList)

           

20.repartitionAndSortWithinPartitions

(partitioner)

該方法根據partitioner對RDD進行分區,并且在每個結果分區中按key進行排序。

/**
* repartition算子的一種變種 ,這個算子可以對分區内部的資料進行排序
* repartitionAndSortWithinPartitions 代用必須是二進制組(對偶元組)
* 參數可以是自定義分區 也可以是系統分區HashPartitioner
*官方建議:若果使用repartition算子從新分區之後,還需要排序,此時建議使用 repartitionAndSortWithinPartitions算子進行計算
* 該算子會在重新分區的同時進行排序(shuffle+sort)
* 如果單純使用repartition 在sort 就相當于是先shuffle完成後在sort */
rdd6_1.repartitionAndSortWithinPartitions(new
HashPartitioner(1)).foreach(println)

           

Action:

1.reduce

(function)

reduce将RDD中元素兩兩傳遞給輸入函數,同時産生一個新值,新值與RDD中下一個元素再被傳遞給輸入函數,直到最後隻有一個值為止。

object TestAction {
  def main(args: Array[String]): Unit = {
//1.reduce算子是屬于Action //此算子求和比較單一隻能使用在純數字計算的方式 val conf = new
SparkConf().setAppName("TestTransformation2").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,2,3,4,5,6))
    val sum: Int = rdd.reduce(_+_)
println(sum)

           

2.collect()

将一個RDD以一個Array數組形式傳回其中的所有元素。

//以數組的形式傳回RDD中存儲資料 因為傳回的是Array不可變的是以需要轉換為 .toBuffer
//或.toList來檢視結果
println(rdd.collect().toBuffer)

           

3.count()

傳回資料集中元素個數,預設Long類型。

//傳回RDD中元素的個數
val count: Long = rdd.count()

           

4.first()

傳回資料集的第一個元素(類似于take(1))

//擷取RDD中第一個元素的值 和 take(1)效果是一樣的
println(rdd.first())

           

5.takeSample

(withReplacement, num, [seed])

對于一個資料集進行随機抽樣

num: 傳回一個包含num個随機抽樣元素的數組

withReplacement: 表示是否有傳回抽樣

seed: 指定生成随機數的種子。

該方法僅在預期結果數組很小的情況下使用,因為所有資料都被加載到driver端的記憶體中。

6.take(n)

傳回一個包含資料集前n個元素的數組(從0下标到n-1下标的元素),不排序。

println(rdd.take(3).toBuffer)

           

7.takeOrdered

(n,[ordering])

傳回RDD中前n個元素,并按預設順序排序(升序)或者按自定義比較器順序排序。

println(rdd.takeOrdered(3).toBuffer)
           

8.saveAsTextFile(path)

将dataSet中元素以文本檔案的形式寫入本地檔案系統或者HDFS等。Spark将對每個元素調用toString方法,将資料元素轉換為文本檔案中的一行記錄。

若将檔案儲存到本地檔案系統,那麼隻會儲存在executor所在機器的本地目錄。

//将結果寫成資料檔案(存儲到HDFS或存儲到本地) 
rdd.saveAsTextFile("檔案存儲路徑即可")
           

9.saveAsSequenceFile

(path)(Java and Scala)

将dataSet中元素以Hadoop SequenceFile的形式寫入本地檔案系統或者HDFS等。(對pairRDD操作)

10.saveAsObjectFile

(path)(Java and Scala)

将資料集中元素以ObjectFile形式寫入本地檔案系統或者HDFS等。

11.countByKey()

用于統計RDD[K,V]中每個K的數量,傳回具有每個key的計數的(k,int)pairs的hashMap。

//統計key的個數 生成的是map key值是key的名稱 value是key的個數
val rdd2 = sc.parallelize(List(("key1",1),("key2",2)))
val stringToLong1: collection.Map[String, Long] = rdd2.countByKey()
           

11_1.countByValue()

//統計元組的個數(統計value的個數,将集合中的每一個元素看做是一個value) val tupleToLong2: collection.Map[(String, Int), Long] =
rdd2.countByValue()
           

11_2.filterByRange()

//過濾對應RDD中元素指定範圍内的資料 --> 轉換算子
val rdd3 = sc.parallelize(List(("e",1),("c",2),("d",4),("c",5),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c", "e")// 包括開始和結束
--> 轉換算子
           

11_3.flatMapValue()

//對偶元組,value值進行扁平化處理 --> 轉換算子
val rdd4 = sc.parallelize(List(("a","1 2"),("b","3 4")))
val rdd4_1: RDD[(String, String)] = rdd4.flatMapValues(_.split(" "))
           

12.foreach

(function)

對資料集中每一個元素運作函數function。

//列印分區資料 /**
* 1.Action算子 直接列印RDD中資料,在列印的同時可以對資料一些操作 */
val rdd5 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd5.foreach(println) //直接列印RDD中資料 rdd5.foreach(x=>{
if(x>2)
println(x)
}) //對RDD中資料進行操作後輸出
           

12_1.foreachPartition

/*
Action算子, 直接列印RDD中對應分區的資料,多用于數資料的持久化(寫成檔案或存儲到資料庫) 多用于将資料存儲到資料庫中
*/
rdd5.foreachPartition(println) //直接列印RDD的資料 
rdd5.foreachPartition(x=>{println(x.reduce(_+_))}) //可以對分區中資料進行列印 
/**
* foreach和foreachPartition差別
* 1.在隻有一個分區的前提下foreach和foreachPartition列印的效果是一樣的
* 2.若需要列印多個分區中的值建議使用foreachPartition因為可以列印對應分區資料,
* 3.foreachPartition使用方式比foreach要多,因為foreachPartition可以列印分區适用于
将資料固化即寫成檔案或輸出到資料庫中
*/
           

12_2.KeyBy

(function)

KeyBy :以傳入的函數傳回值作為key,RDD中的元素作為value 形成新的元組

val rdd6 = sc.parallelize(List("dog","cat","pig")) //keyBy函數時可以周遊RDD中元素
val rdd6_1: RDD[(Int, String)] = rdd6.keyBy(_.length) //RDD中元素的長度是key
value是元素值
//keys 和 values 擷取所有的key 或 擷取所有的value collectAsMap 對偶元組轉換
為Map
           

繼續閱讀