目錄
1、map vs mapPartition
2、filter:過濾
3、zip:拉鍊 要求元素分數和分區數相同
4、差并交
5、distinct:去重
6、排序
7、reduceByKey vs groupByKey
8、join:連接配接:内連接配接、左連接配接、右連接配接、全連接配接
9、重分區
1、map vs mapPartition
map:作用于每一個元素,疊代次數==>元素數
mapPartition:作用于每一個分區,疊代次數==>分區數
==>是以,對于資料庫建立、對象建立等操作,優選mapPartition
mapPartitionWithIndex:傳回分區index
val rdd = sc.parallelize(List(1, 2, 3), 2)
rdd.map(x => {
println("map疊代次數:----------")//執行三次
}).collect()
rdd.mapPartitions(partition => {
println("mapPartition疊代次數:-----------------")//執行兩次
partition.filter(_ < 0)
}).collect()
rdd.mapPartitionsWithIndex((index, partition) => {
partition.map(x => println("分區規則:元素%partition" + index + ": " + x)) //列印出分區index
}).collect()
2、filter:過濾
對于多條件:rdd.filter(x).filter(y)...==>filter(x&&y&&...)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.filter(_ % 2 != 0).filter(_ > 1).foreach(println(_))
rdd.filter(x => x % 2 != 0 && x > 1).foreach(println(_))
3、zip:拉鍊 要求元素分數和分區數相同
val rdd1 = sc.parallelize(List(1, 2, 3))
val rdd2 = sc.parallelize(List("a", "b", "c"))
rdd1.zip(rdd2).foreach(println(_))
//數量不一樣 Can only zip RDDs with same number of elements in each partition
val rdd3 = sc.parallelize(List("a", "b", "c", "d"))
rdd1.zip(rdd3).foreach(println(_))
//分區數不一樣 Can't zip RDDs with unequal numbers of partitions: List(1, 3)
val rdd4 = sc.parallelize(List("a", "b", "c"), 3)
rdd1.zip(rdd4).foreach(println(_))
rdd1.zipWithIndex().collect() //rdd.zip(List(0,1,2...)==> 打編号
4、差并交
val rddLeft = sc.parallelize(List(1, 2, 3, 4, 5))
val rddRight = sc.parallelize(List(4, 5, 6, 7, 8, 8))
rddLeft.union(rddRight).collect() //并集,不改變分區
rddLeft.intersection(rddRight).collect() //交集 底層調用的内連接配接
rddLeft.subtract(rddRight).collect() //差集
5、distinct:去重
如果不使用distinct算子如何去重?
val rddRight = sc.parallelize(List(4, 5, 6, 7, 8, 8))
rddRight.distinct().collect() //去重
//不使用distinct
rddRight.map((_, null)).reduceByKey((x, y) => x).map(_._1).collect()
rddRight.map((_, null)).groupByKey().map(_._1).collect()
//distinct源碼實作方式
//rdd.map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
6、排序
sortBy:作用于RDD
sortByKey:作用于KV類型RDD,通過隐式轉換調用
預設升序,降序傳參ascending=false
val rdd = sc.parallelize(List(7, 5, 7, 6, 8, 8))
rdd.sortBy(x => x).collect()
val kvrdd = sc.parallelize(List(("a", 12), ("c", 20), ("b", 1)))
kvrdd.sortByKey().collect()
//如果kvrdd想按v來排序,實作方式
kvrdd.sortBy(_._2).collect() //1、使用sortBy
kvrdd.map(x => (x._2, x._1)).sortByKey().map(x => (x._2, x._1)).collect() //使用map調換位置排序
kvrdd.sortBy(_._2, false).collect() //降序 預設方式
kvrdd.sortBy(-_._2).collect() //降序 推薦方式,針對于數值類型
7、reduceByKey vs groupByKey
reduceByKey:對相同key的value做fun,reduceByKey會先做本地combine,然後再shuffle
groupByKey:按key進行分組 ,groupByKeY所有的元素都會shuffle
==>都可以用于WC統計,但是reduceByKey的shuffle資料量會小一些
val rdd = sc.textFile("file:///opt/mydata/olddata/data1.txt") //file: 4.0K
rdd.flatMap(_.split("\t")).map((_, 1))
.groupByKey().mapValues(_.sum).collect() //Shuffle Read:200B
rdd.flatMap(_.split("\t")).map((_, 1))
.reduceByKey(_ + _).collect() //Shuffle Read:198.0 B
8、join:連接配接:内連接配接、左連接配接、右連接配接、全連接配接
val rddLeft = sc.parallelize(List(("a", "hz"), ("b", "sh"), ("c", "bj")))
val rddRight = sc.parallelize(List(("a", 1), ("b", 2), ("d", 3)))
rddLeft.join(rddRight).collect() //内連接配接 底層調用cogroup
rddLeft.leftOuterJoin(rddRight).collect() //左連接配接
rddLeft.rightOuterJoin(rddRight).collect() // 右連接配接
rddLeft.fullOuterJoin(rddRight).collect() //全連接配接
9、重分區
coalesce(numPartitions:Int,shuffle:Boolean=false):預設縮小分區(不shuffle),傳遞true可以增加分區(shuffle)
rePartition:底層調用coalesce(numPartition,true)==>變大變小都會執行shuffle
val rdd = sc.parallelize(List(7, 5, 7, 6, 8, 8), 3)
println("原始分區數========>" + rdd.partitions.length)
rdd.mapPartitionsWithIndex((index, partition) => {
partition.map(x => println(index + "==>" + x))
}).collect()
println(
"coalesce 設定為2========>" +
rdd.coalesce(2).partitions.size
)
println(
"coalesce 設定為3========>" +
rdd.coalesce(3, true).partitions.length
)
println(
"repartition 設定為4========>" +
rdd.repartition(4).partitions.length //底層調用coalesce(numPartitions,true)
)