感覺自己好久沒有更新過部落格了,本人最近有點兒迷失,特來寫篇技術部落格,以做自警
不知道大家有沒有注意到,大家在編寫spark程式調用sortBy/sortByKey這兩個算子的時候大家會不會有這樣子的疑問,他們兩個明明是transformation,為啥在執行的時候卻觸發了作業的執行呢?今天就和大家一起一探究竟?
val wordCountRdd = spark.sparkContext.textFile(path).
flatMap(_.split(" ")).
map(word => (word, 1)).
reduceByKey(_ + _)
val sortByCountDescRdd = wordCountRdd.sortBy(-_._2)
當你在shell輸入一下的code時,會發現如下圖:
出現了類似action的運作條,到底怎麼回事兒呢?
首先sortBy的實作就是調用了sortByKey,是以我們隻關注sortByKey的實作
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
此時先看RangePartitioner分區器的實作,隻挑重要部分開始描述啊
private[spark] object RangePartitioner {
/**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
在這裡調用了RDD的collect action算子出發了作業的運作,其實此處的collection是對key進行采樣已确認key的分布情況,總之是在為做全局排序做準備。
想知道詳細的請檢視spark源碼執行吧