官方文檔上列舉共有32種常見算子,包括Transformation的20種操作和Action的12種操作。
内容自己用心整理,看我的就夠了
Transformation:
1.map
map的輸入變換函數應用于RDD中所有元素,而mapPartitions應用于所有分區。差別于mapPartitions主要在于調用粒度不同。如parallelize(1 to 10, 3),map函數執行10次,而mapPartitions函數執行3次。
object TestTransformation {
def main(args: Array[String]): Unit = {
//0.先建立SparkContext對象
val conf = new SparkConf().setAppName("TestTransformation").setMaster("local")
val sc = new SparkContext(conf)
//1.建立RDD
val rdd: RDD[Int] = sc.parallelize(List(5,6,4,7,3,9,2,10,8))
//1.周遊RDD中每一個元素并進行操作 參數是一個處理RDD中資料的函數
val rdd2: RDD[Int] = rdd.map(_ * 2)
//行動算子 collect 将RDD中資料轉換成一個集合進行存儲
println(rdd2.collect().toBuffer)
2.filter
參數(function)
過濾操作,滿足filter内function函數為true的RDD内所有元素組成一個新的資料集。如:filter(a == 1)。
3.flatMap
參數(function)
map是對RDD中元素逐一進行函數操作映射為另外一個RDD,而flatMap操作是将函數應用于RDD之中的每一個元素,将傳回的疊代器的所有内容構成新的RDD。
flatMap與map差別在于map為“映射”,而flatMap“先映射,後扁平化”,map對每一次(func)都産生一個元素,傳回一個對象,而flatMap多一步就是将所有對象合并為一個對象。
//3.周遊RDD中每一個元素并将元素進行扁平化處理
val rdd4 = sc.parallelize(Array("a b c","b c d"))
//得到集合中集合的效果 而非集合中存儲資料
//val rdd5: RDD[Array[String]] = rdd4.map(_.split(" "))
val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
println(rdd5.collect().toBuffer)
4.mapPartitions
參數(function)
區于foreachPartition(屬于Action,且無傳回值),而mapPartitions可擷取傳回值。與map的差別前面已經提到過了,但由于單獨運作于RDD的每個分區上(block),是以在一個類型為T的RDD上運作時,(function)必須是Iterator => Iterator類型的方法(入參)。
//周遊出集合中每一個元素并進行操作 //建立了一個rdd1,資料作用在3個分區上
val rdd1 =sc.parallelize(List(1,2,3,4,5,6),3)
/*
mapPartitions是對每一個分區中資料進行疊代(資料處理 操作)
f: Iterator[T] => Iterator[U] 第一個參數疊代器是對象[分區對象] preservesPartitioning: Boolean = false 預設值 是否保留父RDD的分區資訊(此值不傳
入)
mapPartitions 和 map算子 如果是一個分區周遊的效果是沒有什麼差別的
但是建議在實際開發中使用mapPartitions算子代替Map,可以加載資料的處理
ps:如果RDD中資料量過大比如10億條,此時堅決不要使用mapPartitions進行資料周遊,可能會
出現OOM記憶體溢出 */
rdd1.mapPartitions(_.map(_*10))
//解釋:第一個下滑線代表的是對應的分區即rdd有幾個分區,下劃線就會被執行多少次
//例如: 分區是3 --> 資料可能會被分為 1,2 一個分區 3,4 一個分區 5,6 一個分區 此時_ 代表的就是每一個分區對象
// 第二個下劃線代表的是每一個分區内部的具體資料
//例如: 第一個下劃線觸發了第一個分區 得到資料是 1,2 第二個下劃線就有相當于是 1*10 2*10
5.mapPartitionsWithIndex
參數(function)
與mapPartitions類似,但需要提供一個表示分區索引值的整型值作為參數,是以function必須是(int, Iterator)=>Iterator類型的。
//mapPartitionsWithIndex 是對rdd中每個分區的周遊操作
//本質操作和mapPartitions類似,隻不過,可以更加具體的是對資料進行輸出
/**
* f: (Int, Iterator[T]) => Iterator[U] 第一個參數是分區的疊代方式,疊代方式有兩 個值Int是分區值(第幾分區) Iterator代表的是分區内部具體的值
值不傳
* preservesPartitioning: Boolean = false 是否保留父RDD的Partition資訊 預設 */
val Iter = (index:Int,iter:Iterator[Int])=>{
iter.map(x => {"[partID"+index+",value"+x+"]"})
}
val rdd3: RDD[String] = rdd1.mapPartitionsWithIndex(Iter)
println(rdd3.collect.toList)
6.sample
參數(withReplacement, fraction, seed)
采樣操作,用于從樣本中取出部分資料。withReplacement是否放回,fraction采樣比例,seed用于指定的随機數生成器的種子。(是否傳回抽樣分true和false,fraction取樣比例為(0, 1]。seed種子為整型實數。)
//4.sample随機抽樣
val rdd5_1 = sc.parallelize(1 to 10)
/**
* withReplacement: Boolean第一參數抽出資料是否傳回 true為傳回 false不傳回
* fraction: Double 第二參數 抽樣的比例 30% 傳入0.3即可,但是這個是浮動值 不一定準确
* seed: Long = Utils.random.nextLong 産生一個随機的種子 */
val sample: RDD[Int] = rdd5_1.sample(false,0.5)
println(sample.collect.toBuffer)
7.union
參數(otherDataSet)
對于源資料集和其他資料集求并集,不去重。
8.intersection
參數(otherDataSet)
對于源資料集和其他資料集求交集,并去重,且無序傳回。
9.distinct
參數([numTasks])
這個numTasks并不跟分區有關系,而可以了解為一個數學概念中的“因子”。如果設定的numTasks能被資料集中元素整除,那麼排序就按先無序的排因子,後無序排非因子的組合(即相當于局部無序);如果設定的numTasks不能被資料集中所有元素整除,那麼排序會按照去重之前RDD排序的順序傳回。
注:之後groupByKey、reduceByKey、aggregateByKey、sortByKey、join、cogroup等Transformation操作均包含[numTasks]任務數這個參數,參考上一行連結了解。
傳回一個在源資料集去重之後的新資料集,即去重,并局部無序而整體有序傳回。
//5.union求并集 intersection 交集
val rdd6 = sc.parallelize(List(5,6,7,8))
val rdd7 = sc.parallelize(List(1,2,5,6))
println( (rdd6 union rdd7).collect.toBuffer)
println( (rdd6 intersection rdd7).collect.toBuffer)
//6.去除重複 distinct
val rdd9 = sc.parallelize(List(1,2,2,3,4,5,2,6,9))
println(rdd9.distinct.collect.toBuffer)
10.groupByKey
([numTasks])
在一個PairRDD或(k,v)RDD上調用,傳回一個(k,Iterable)。主要作用是将相同的所有的鍵值對分組到一個集合序列當中,其順序是不确定的。groupByKey是把所有的鍵值對集合都加載到記憶體中存儲計算,若一個鍵對應值太多,則易導緻記憶體溢出。
//分組
//第一種 是根據傳入指定的參數進行分組
val rdd11_4_1: RDD[((String, Int), Iterable[((String, Int), (String,
Int))])] = rdd11_3.groupBy(_._1)
//第二種 根據key進行分組(需要是一個對偶元組),根據key進行分組
val rdd11_4_2: RDD[((String, Int), Iterable[(String, Int)])] =
rdd11_3.groupByKey()
11.reduceByKey
(function,[numTasks])
與groupByKey類似,卻有不同。如(a,1), (a,2), (b,1), (b,2)。groupByKey産生中間結果為( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey為(a,3), (b,3)。
reduceByKey主要作用是聚合,groupByKey主要作用是分組。(function對于key值來進行聚合)
//求和
//除了XXXBykey之外,還有一個求和的算子,不此算子是action算子 reduce
//1.reduceBykey将相同key 為一組進行聚合,得到最終結果集
val rdd6 = sc.parallelize(Array(("tom",1),("jerry",3),("kitty",2),
("jerry",2)))
val sumed: RDD[(String, Int)] = rdd6.reduceByKey(_+_)
/**
觸發原則: reduceByKey會尋找相同的key,然後進行相加求和計算 第一次 第一個下劃線代表對應key中value的值
第二個下劃線代表對應key中value的值
然後求和
後續求和 : 第一個下劃線會擷取上一次計算的結果
第二個下劃線會擷取key中對應value的值 直到RDD沒有相同key時,此時reduceBykey結束一次計算 */
12.aggregateByKey
(zeroValue)(seqOp, combOp, [numTasks])
( zeroValue: U )( seqOp: (U, V) => U , combOp: (U, U) => U )
zeroValue預設參數值即初始值 會參與到分區中計算
seqOp 局部聚合(求分區内部的資料和)
combOp 全局聚合(求所有分區資料聚合後的求和)
類似reduceByKey,對pairRDD中相同的key值進行聚合操作
在kv對中RDD按照key将value進行分組合并,合并的時将會每個value和初始值作為seqOp函數參數
進行使用,進行計算的同時,會傳回一個結果 作為一個新的kv對存在,然後将結果再按照key進行合并,最後,将每個分區中value傳遞給combOp函數在此進行計算(現将前兩value值進行計算,然後傳回 結果和下一個value再進行計算 以此類推)
val rdd8 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),
("dog",4),("cat",4)),2)
def func[T](index:Int,iter: Iterator[(T)])={
iter.map(x=>{"[partID:"+index+" value:"+x})
}//寫一個函數将分區中資料周遊出來
println(rdd8.mapPartitionsWithIndex(func).collect.toList)
val value: RDD[(String, Int)] = rdd8.aggregateByKey(0)(math.max(_,_),_+_)
println(value.collect().toBuffer)// ArrayBuffer((dog,4), (pig,10), (cat,9))
分析:
//partID:0 value:(cat,2), partID:0 value:(cat,5), partID:0 value:(pig,10) -->分區一
// --> aggregateByKey分區計算 得到的結果是( (cat ,5),(pig,10))
// partID:1 value:(dog,3), partID:1 value:(dog,4), partID:1 value:(cat,4)) --> 分區二
// --> aggregateByKey分區計算 得到的結果是( (dog ,4),(cat,4))
//全局聚合會将所有分區中得到的結果看做是一個分區即 ((cat ,5),(pig,10),(dog ,4), (cat,4))
// 相同key為一組 觸發一次計算 (cat ,9) (pig,10) ,(dog ,4)
12_1.combineByKey
/**
* 求和
* combineByKey 相同key為一組,把value合并計算
* createCombiner: V => C 會周遊分區中所有的元素,是以每個元素的key要麼是遇到(等 同于是相同key)要麼就是沒有遇到過的(等同于是不同的key)
* 如果這個元素是一個新的元素,使用createCombiner()函數 對這個可以進行第一次的初始累加
* mergeValue: (C, V) => C 如果是第一次處理目前分區中遇到的key,它會使用 mergeVlaue進行對該值的合并
mergeCombiners: (C, C) => 由于每個分區都是獨立處理,因為對于同一個key可以進行多個累加
ps:使用方式類似aggregateByKey,但是第一個參數是周遊RDD中每元素,剩餘的求和位置是相同的
combineByKey不允許使用 _ 代表 參數
*/
val rdd9 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),
("dog",4),("cat",4)),2)
val rdd9_1 = rdd9.combineByKey(x=>x,(a:Int,b:Int)=>a+b,
(m:Int,n:Int)=>m+n)
println(rdd9_1.collect.toList)
13.sortByKey
([ascending], [numTasks])
排序算子,除了spark中内置排序算子之外,可以先将RDD通過collect算子轉換為Scala版本的集合,然後 再進行排序,再對資料轉換RDD
同樣是基于pairRDD的,根據key值來進行排序。ascending升序,預設為true,即升序;numTasks
//1.sortByKey 根據key進行排序,但是key必須具備可比較性,必須實作Ordered特質
val rdd4 = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
//說明: 是有一個參數的, 這個參數可以進行排序是升序還是降序,預設是true即升序, 降序false val sorted: RDD[(Int, String)] = rdd4.sortByKey()
println(sorted.collect.toList)
//2.sortBy和Scala中sortBy是不一樣的
//spark中sortBy是可以自定義升降序排序, scala中是不可以的
//sortBy比sortBykey更加靈活,sortBy可以根據第一個參數決定誰進行排序 val rdd5 = sc.parallelize(List(1,5,2,6,3,6,32,345,723,34))
val sorted2: RDD[Int] = rdd5.sortBy(x=>x,false)
println(sorted2.collect.toList)
14.join
(otherDataSet,[numTasks])
加入一個RDD,在一個(k,v)和(k,w)類型的dataSet上調用,傳回一個(k,(v,w))的pair dataSet。
//join 相同的key會合并, 相當于相同key為一組 value進行聚和(聚和不是求和,而是将值放到 一起)
val rdd10_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
val rdd10_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
val rdd10_3 = rdd10_1 join rdd10_2
println(rdd10_3.collect.toBuffer)//ArrayBuffer((tom,(1,2)), (jerry,(3,2)))舍棄不同key的情況
14_1.leftOuterJoin、rightOuterJoin
//左連接配接和右連接配接 除了基準值之外 都是Option類型,因為值可能存在空值即null
//左連接配接是以左邊為基準值 右邊沒有的為null
rdd10_1 leftOuterJoin rdd10_2
//右連接配接是以右邊為基準值,左變沒有的為null
rdd10_1 rightOuterJoin rdd10_2
15.cogroup
(otherDataSet,[numTasks])
合并兩個RDD,生成一個新的RDD。執行個體中包含兩個Iterable值,第一個表示RDD1中相同值,第二個表示RDD2中相同值(key值),這個操作需要通過partitioner進行重新分區,是以需要執行一次shuffle操作。(若兩個RDD在此之前進行過shuffle,則不需要)
//這種方式和groupByKey類似,根據相同key為一組進行分組
//不同點,groupBykey必須是對偶元組,若資料沒有形成對偶元組,必須進行合并
//cogroup對資料并不需要合并的前提下就可以根據相同key進行分組
//ps:cogroup本質還是進行了合并
val rdd11_4_3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1
cogroup rdd11_2
println(rdd11_4_3.collect().toBuffer)
//ArrayBuffer((tom,(CompactBuffer(1),CompactBuffer(2))), (dog,(CompactBuffer(),CompactBuffer(10))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
16.cartesian
(otherDataSet)
求笛卡爾乘積。該操作不會執行shuffle操作。
//産生笛卡爾積cartesian
val rdd11_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
val rdd11_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
val rdd11_3 = rdd11_1 cartesian rdd11_2
println(rdd11_3.collect.toBuffer)
17.pipe
(command,[envVars])
通過一個shell指令來對RDD各分區進行“管道化”。通過pipe變換将一些shell指令用于Spark中生成的新RDD,如:
val rdd: RDD[Int] = sc.parallelize(0 to 7 , 4)
val rdd1: RDD[Array[Int]] = rdd.glom()
val rdd2: RDD[String] = rdd.pipe("head -n 1")
println(rdd1.collect().toBuffer)//ArrayBuffer([[email protected], [[email protected], [[email protected], [[email protected])
//内部是數組:ArrayBuffer(Array(0,1),Array(2,3),Array(4,5),Array(6,7))
println(rdd2.collect().toBuffer)//ArrayBuffer(0, 2, 4, 6)
//截取每個分區中第一個元素作為新的RDD
18.coalesce
(numPartitions)
重新分區,**減少**RDD中分區的數量到numPartitions。
/**
* coalesce 更改分區
* 不能将小分區修改為大分區,因為不會發生shuffle
* 例如 原始分區是3 新分區是5 結果不會将資料存儲到新分區中 而是使用原始分區 --> 不
會發生shuffle *縮減分區,将原有分區進行縮減
*/
val reps2 = rdd6.coalesce(5)
println("通過coalesce調整分區後的分區數量:"+reps1.partitions.length) println(reps2.mapPartitionsWithIndex(Iter).collect().toList)
19.repartition
(numPartitions)
repartition是coalesce接口中shuffle為true的簡易實作,即Reshuffle RDD并随機分區,使各分區資料量盡可能平衡。若分區之後分區數遠大于原分區數,則需要shuffle。
//第二參數可以指定分區值 --> 值是幾 分區就會分幾個
val rdd6= sc.parallelize(List(1,2,3,4,5,6),3)
println("初始化分區:"+rdd6.partitions.length)
/**
*若使用repartition分區需要注意 如果從少量分區改變為多分區此時會發生shuffle * 例如 原有分區數量是3 修改後分區數量是5 -> 重新發生shuffle
*/
println(rdd6.mapPartitionsWithIndex(Iter).collect().toList)
val reps1: RDD[Int] = rdd6.repartition(5)
println("通過repartition調整分區後的分區數量:"+reps1.partitions.length)
println(reps1.mapPartitionsWithIndex(Iter).collect().toList)
20.repartitionAndSortWithinPartitions
(partitioner)
該方法根據partitioner對RDD進行分區,并且在每個結果分區中按key進行排序。
/**
* repartition算子的一種變種 ,這個算子可以對分區内部的資料進行排序
* repartitionAndSortWithinPartitions 代用必須是二進制組(對偶元組)
* 參數可以是自定義分區 也可以是系統分區HashPartitioner
*官方建議:若果使用repartition算子從新分區之後,還需要排序,此時建議使用 repartitionAndSortWithinPartitions算子進行計算
* 該算子會在重新分區的同時進行排序(shuffle+sort)
* 如果單純使用repartition 在sort 就相當于是先shuffle完成後在sort */
rdd6_1.repartitionAndSortWithinPartitions(new
HashPartitioner(1)).foreach(println)
Action:
1.reduce
(function)
reduce将RDD中元素兩兩傳遞給輸入函數,同時産生一個新值,新值與RDD中下一個元素再被傳遞給輸入函數,直到最後隻有一個值為止。
object TestAction {
def main(args: Array[String]): Unit = {
//1.reduce算子是屬于Action //此算子求和比較單一隻能使用在純數字計算的方式 val conf = new
SparkConf().setAppName("TestTransformation2").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,4,5,6))
val sum: Int = rdd.reduce(_+_)
println(sum)
2.collect()
将一個RDD以一個Array數組形式傳回其中的所有元素。
//以數組的形式傳回RDD中存儲資料 因為傳回的是Array不可變的是以需要轉換為 .toBuffer
//或.toList來檢視結果
println(rdd.collect().toBuffer)
3.count()
傳回資料集中元素個數,預設Long類型。
//傳回RDD中元素的個數
val count: Long = rdd.count()
4.first()
傳回資料集的第一個元素(類似于take(1))
//擷取RDD中第一個元素的值 和 take(1)效果是一樣的
println(rdd.first())
5.takeSample
(withReplacement, num, [seed])
對于一個資料集進行随機抽樣
num: 傳回一個包含num個随機抽樣元素的數組
withReplacement: 表示是否有傳回抽樣
seed: 指定生成随機數的種子。
該方法僅在預期結果數組很小的情況下使用,因為所有資料都被加載到driver端的記憶體中。
6.take(n)
傳回一個包含資料集前n個元素的數組(從0下标到n-1下标的元素),不排序。
println(rdd.take(3).toBuffer)
7.takeOrdered
(n,[ordering])
傳回RDD中前n個元素,并按預設順序排序(升序)或者按自定義比較器順序排序。
println(rdd.takeOrdered(3).toBuffer)
8.saveAsTextFile(path)
将dataSet中元素以文本檔案的形式寫入本地檔案系統或者HDFS等。Spark将對每個元素調用toString方法,将資料元素轉換為文本檔案中的一行記錄。
若将檔案儲存到本地檔案系統,那麼隻會儲存在executor所在機器的本地目錄。
//将結果寫成資料檔案(存儲到HDFS或存儲到本地)
rdd.saveAsTextFile("檔案存儲路徑即可")
9.saveAsSequenceFile
(path)(Java and Scala)
将dataSet中元素以Hadoop SequenceFile的形式寫入本地檔案系統或者HDFS等。(對pairRDD操作)
10.saveAsObjectFile
(path)(Java and Scala)
将資料集中元素以ObjectFile形式寫入本地檔案系統或者HDFS等。
11.countByKey()
用于統計RDD[K,V]中每個K的數量,傳回具有每個key的計數的(k,int)pairs的hashMap。
//統計key的個數 生成的是map key值是key的名稱 value是key的個數
val rdd2 = sc.parallelize(List(("key1",1),("key2",2)))
val stringToLong1: collection.Map[String, Long] = rdd2.countByKey()
11_1.countByValue()
//統計元組的個數(統計value的個數,将集合中的每一個元素看做是一個value) val tupleToLong2: collection.Map[(String, Int), Long] =
rdd2.countByValue()
11_2.filterByRange()
//過濾對應RDD中元素指定範圍内的資料 --> 轉換算子
val rdd3 = sc.parallelize(List(("e",1),("c",2),("d",4),("c",5),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c", "e")// 包括開始和結束
--> 轉換算子
11_3.flatMapValue()
//對偶元組,value值進行扁平化處理 --> 轉換算子
val rdd4 = sc.parallelize(List(("a","1 2"),("b","3 4")))
val rdd4_1: RDD[(String, String)] = rdd4.flatMapValues(_.split(" "))
12.foreach
(function)
對資料集中每一個元素運作函數function。
//列印分區資料 /**
* 1.Action算子 直接列印RDD中資料,在列印的同時可以對資料一些操作 */
val rdd5 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd5.foreach(println) //直接列印RDD中資料 rdd5.foreach(x=>{
if(x>2)
println(x)
}) //對RDD中資料進行操作後輸出
12_1.foreachPartition
/*
Action算子, 直接列印RDD中對應分區的資料,多用于數資料的持久化(寫成檔案或存儲到資料庫) 多用于将資料存儲到資料庫中
*/
rdd5.foreachPartition(println) //直接列印RDD的資料
rdd5.foreachPartition(x=>{println(x.reduce(_+_))}) //可以對分區中資料進行列印
/**
* foreach和foreachPartition差別
* 1.在隻有一個分區的前提下foreach和foreachPartition列印的效果是一樣的
* 2.若需要列印多個分區中的值建議使用foreachPartition因為可以列印對應分區資料,
* 3.foreachPartition使用方式比foreach要多,因為foreachPartition可以列印分區适用于
将資料固化即寫成檔案或輸出到資料庫中
*/
12_2.KeyBy
(function)
KeyBy :以傳入的函數傳回值作為key,RDD中的元素作為value 形成新的元組
val rdd6 = sc.parallelize(List("dog","cat","pig")) //keyBy函數時可以周遊RDD中元素
val rdd6_1: RDD[(Int, String)] = rdd6.keyBy(_.length) //RDD中元素的長度是key
value是元素值
//keys 和 values 擷取所有的key 或 擷取所有的value collectAsMap 對偶元組轉換
為Map