天天看點

SparkCore之RDD算子transformation1、map vs mapPartition2、filter:過濾3、zip:拉鍊 要求元素分數和分區數相同4、差并交5、distinct:去重  6、排序7、reduceByKey vs groupByKey8、join:連接配接:内連接配接、左連接配接、右連接配接、全連接配接9、重分區        

目錄

1、map vs mapPartition

2、filter:過濾

3、zip:拉鍊 要求元素分數和分區數相同

4、差并交

5、distinct:去重 

6、排序

7、reduceByKey vs groupByKey

8、join:連接配接:内連接配接、左連接配接、右連接配接、全連接配接

9、重分區

1、map vs mapPartition

map:作用于每一個元素,疊代次數==>元素數

mapPartition:作用于每一個分區,疊代次數==>分區數

==>是以,對于資料庫建立、對象建立等操作,優選mapPartition

mapPartitionWithIndex:傳回分區index

val rdd = sc.parallelize(List(1, 2, 3), 2)

rdd.map(x => {
  println("map疊代次數:----------")//執行三次
}).collect()

rdd.mapPartitions(partition => {
  println("mapPartition疊代次數:-----------------")//執行兩次
  partition.filter(_ < 0)
}).collect()

rdd.mapPartitionsWithIndex((index, partition) => {
  partition.map(x => println("分區規則:元素%partition" + index + ": " + x)) //列印出分區index
}).collect()
           

2、filter:過濾

對于多條件:rdd.filter(x).filter(y)...==>filter(x&&y&&...)

val rdd = sc.parallelize(List(1, 2, 3))
rdd.filter(_ % 2 != 0).filter(_ > 1).foreach(println(_)) 
rdd.filter(x => x % 2 != 0 && x > 1).foreach(println(_)) 
           

3、zip:拉鍊 要求元素分數和分區數相同

val rdd1 = sc.parallelize(List(1, 2, 3))
val rdd2 = sc.parallelize(List("a", "b", "c"))

rdd1.zip(rdd2).foreach(println(_))
//數量不一樣 Can only zip RDDs with same number of elements in each partition
val rdd3 = sc.parallelize(List("a", "b", "c", "d"))
rdd1.zip(rdd3).foreach(println(_))
//分區數不一樣 Can't zip RDDs with unequal numbers of partitions: List(1, 3)
val rdd4 = sc.parallelize(List("a", "b", "c"), 3)
rdd1.zip(rdd4).foreach(println(_))

rdd1.zipWithIndex().collect() //rdd.zip(List(0,1,2...)==> 打編号
           

4、差并交

val rddLeft = sc.parallelize(List(1, 2, 3, 4, 5))
val rddRight = sc.parallelize(List(4, 5, 6, 7, 8, 8))
rddLeft.union(rddRight).collect() //并集,不改變分區
rddLeft.intersection(rddRight).collect() //交集 底層調用的内連接配接
rddLeft.subtract(rddRight).collect() //差集
           

5、distinct:去重 

如果不使用distinct算子如何去重?

val rddRight = sc.parallelize(List(4, 5, 6, 7, 8, 8))
rddRight.distinct().collect() //去重
//不使用distinct
rddRight.map((_, null)).reduceByKey((x, y) => x).map(_._1).collect()
rddRight.map((_, null)).groupByKey().map(_._1).collect()
//distinct源碼實作方式
//rdd.map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
           

6、排序

sortBy:作用于RDD

sortByKey:作用于KV類型RDD,通過隐式轉換調用

預設升序,降序傳參ascending=false

val rdd = sc.parallelize(List(7, 5, 7, 6, 8, 8))
rdd.sortBy(x => x).collect()

val kvrdd = sc.parallelize(List(("a", 12), ("c", 20), ("b", 1)))
kvrdd.sortByKey().collect()
//如果kvrdd想按v來排序,實作方式
kvrdd.sortBy(_._2).collect() //1、使用sortBy
kvrdd.map(x => (x._2, x._1)).sortByKey().map(x => (x._2, x._1)).collect() //使用map調換位置排序

kvrdd.sortBy(_._2, false).collect() //降序 預設方式
kvrdd.sortBy(-_._2).collect() //降序 推薦方式,針對于數值類型
           

7、reduceByKey vs groupByKey

reduceByKey:對相同key的value做fun,reduceByKey會先做本地combine,然後再shuffle

groupByKey:按key進行分組 ,groupByKeY所有的元素都會shuffle

==>都可以用于WC統計,但是reduceByKey的shuffle資料量會小一些

val rdd = sc.textFile("file:///opt/mydata/olddata/data1.txt") //file: 4.0K

rdd.flatMap(_.split("\t")).map((_, 1))
  .groupByKey().mapValues(_.sum).collect() //Shuffle Read:200B

rdd.flatMap(_.split("\t")).map((_, 1))
  .reduceByKey(_ + _).collect() //Shuffle Read:198.0 B
           

8、join:連接配接:内連接配接、左連接配接、右連接配接、全連接配接

val rddLeft = sc.parallelize(List(("a", "hz"), ("b", "sh"), ("c", "bj")))
val rddRight = sc.parallelize(List(("a", 1), ("b", 2), ("d", 3)))

rddLeft.join(rddRight).collect() //内連接配接 底層調用cogroup
rddLeft.leftOuterJoin(rddRight).collect() //左連接配接
rddLeft.rightOuterJoin(rddRight).collect() // 右連接配接
rddLeft.fullOuterJoin(rddRight).collect() //全連接配接
           

9、重分區

coalesce(numPartitions:Int,shuffle:Boolean=false):預設縮小分區(不shuffle),傳遞true可以增加分區(shuffle)

rePartition:底層調用coalesce(numPartition,true)==>變大變小都會執行shuffle

val rdd = sc.parallelize(List(7, 5, 7, 6, 8, 8), 3)
println("原始分區數========>" + rdd.partitions.length)
rdd.mapPartitionsWithIndex((index, partition) => {
  partition.map(x => println(index + "==>" + x))
}).collect()
println(
  "coalesce 設定為2========>" +
    rdd.coalesce(2).partitions.size
)
println(
  "coalesce 設定為3========>" +
    rdd.coalesce(3, true).partitions.length
)
println(
  "repartition 設定為4========>" +
    rdd.repartition(4).partitions.length //底層調用coalesce(numPartitions,true)
)
           

繼續閱讀