天天看點

spark core sortBy和sortByKey探索

感覺自己好久沒有更新過部落格了,本人最近有點兒迷失,特來寫篇技術部落格,以做自警

不知道大家有沒有注意到,大家在編寫spark程式調用sortBy/sortByKey這兩個算子的時候大家會不會有這樣子的疑問,他們兩個明明是transformation,為啥在執行的時候卻觸發了作業的執行呢?今天就和大家一起一探究竟?

val wordCountRdd = spark.sparkContext.textFile(path).
      flatMap(_.split(" ")).
      map(word => (word, 1)).
      reduceByKey(_ + _)

    val sortByCountDescRdd = wordCountRdd.sortBy(-_._2)
           

當你在shell輸入一下的code時,會發現如下圖:

spark core sortBy和sortByKey探索

出現了類似action的運作條,到底怎麼回事兒呢?

首先sortBy的實作就是調用了sortByKey,是以我們隻關注sortByKey的實作

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }
           

此時先看RangePartitioner分區器的實作,隻挑重要部分開始描述啊

private[spark] object RangePartitioner {

  /**
   * Sketches the input RDD via reservoir sampling on each partition.
   *
   * @param rdd the input RDD to sketch
   * @param sampleSizePerPartition max sample size per partition
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }
           

在這裡調用了RDD的collect action算子出發了作業的運作,其實此處的collection是對key進行采樣已确認key的分布情況,總之是在為做全局排序做準備。

想知道詳細的請檢視spark源碼執行吧

繼續閱讀