天天看點

必讀|spark的重分區及排序

必讀|spark的重分區及排序

浪尖 浪尖聊大資料

昨天說了,mapPartitions 的使用技巧。大家應該都知道mapPartitions值針對整個分區執行map操作。而且對于PairRDD的分區預設是基于hdfs的實體塊,當然不可分割的話就是hdfs的檔案個數。但是我們也可以給partitionBy 算子傳入HashPartitioner,來給RDD進行重新分區,而且會使得key的hashcode相同的資料落到同一個分區。

spark 1.2之後引入了一個高品質的算子repartitionAndSortWithinPartitions 。該算子為spark的Shuffle增加了sort。假如,後面再跟mapPartitions算子的話,其算子就是針對已經按照key排序的分區,這就有點像mr的意思了。與groupbykey不同的是,資料不會一次裝入記憶體,而是使用疊代器一次一條記錄從磁盤加載。這種方式最小化了記憶體壓力。

repartitionAndSortWithinPartitions 也可以用于二次排序。

import org.apache.spark.Partitioner
 class KeyBasePartitioner(partitions: Int) extends Partitioner {

   override def numPartitions: Int = partitions

   override def getPartition(key: Any): Int = {
     val k = key.asInstanceOf[Int]
     Math.abs(k.hashCode() % numPartitions)
   }
 }

 import org.apache.spark.SparkContext._
     sc.textFile("file:///opt/hadoop/spark-2.3.1/README.md").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).map(each=>(each._2,each._1))
     implicit val caseInsensitiveOrdering = new Ordering[Int] {
      override def compare(a: Int, b: Int) = b.compareTo(a)
     }
     // Sort by key, using 
 res7.repartitionAndSortWithinPartitions(new KeyBasePartitioner(3)).saveAsTextFile("file:///opt/output/")           
mdhdeMacBook-Pro-3:output mdh$ pwd
/opt/output
mdhdeMacBook-Pro-3:output mdh$ ls
_SUCCESS        part-00000      part-00001      part-00002
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00000 
(24,the)
(12,for)
(9,##)
(9,and)
(6,is)
(6,in)
(3,general)
(3,documentation)
(3,example)
(3,how)
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00001
(16,Spark)
(7,can)
(7,run)
(7,on)
(4,build)
(4,Please)
(4,with)
(4,also)
(4,if)
(4,including)
mdhdeMacBook-Pro-3:output mdh$ head -n 10 part-00002
(47,)
(17,to)
(8,a)
(5,using)
(5,of)
(2,Python)
(2,locally)
(2,This)
(2,Hive)
(2,SparkPi)
mdhdeMacBook-Pro-3:output mdh$