天天看點

spark 排序實作原理 RangePartitioner

sprak Core中比較常用的排序方法sortBy和sortKByKey,這是一個shuffle類算法子,寬依賴,出發DAGSchedular劃分Stage,那麼他們排序的原理是啥呢?

第一步Stage0:

分區采樣Sample,建立RangePartitioner,先對輸入的資料的key做采樣,來估算Key的分布,然後按照指定的排序切分range,盡量讓每個partition對應的range裡的key分布均勻,計算出一個RangeBounds,長度為partitions - 1的Array[Key],裡面存放的資料為前partition - 1分區中每個分區的key的上界,最後一個partition就是存放剩餘的其他資料

此時的采樣大小:

//此處确定采樣大小,前提假設資料分區大緻平衡,即不存在傾斜的情況
// 總體采樣數量
val sampleSize = math.min(20.0 * partitions, 1e6)
// 每個分區采樣數量
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// 資料采樣
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
// 此方法資料采樣 傳回的資料 
def sketch[K : ClassTag](
   rdd: RDD[K],
   sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
 val shift = rdd.id
 // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
 val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
   val seed = byteswap32(idx ^ (shift << 16))
   val (sample, n) = SamplingUtils.reservoirSampleAndCount(
     iter, sampleSizePerPartition, seed)
   Iterator((idx, n, sample))
 }.collect()
 val numItems = sketched.map(_._2).sum
 (numItems, sketched)
}
如果一個分區包含的項遠遠超過了平均數,我們将從中重新采樣以確定從該分區收集足夠的項。
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // 采樣的資料占整體的比例
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // 以期望的采樣機率重新采樣不平衡的分區
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
           

第二步:shuffle write

開始shuffle在map side做shuffle write,每個計算節點根據前面計算出來的rangeBounds對輸入資料重新分片,分片采用rangePartitioner 使得重新分區後的資料在分區之間隻排好序的,

然後此時分區内的資料不一定是已經排好序

第三步:shuffle read

在reducer side,對每個分區内的資料做排序

這樣完成之後,partition之間的資料在map side就保證有序,而每個分區内的資料在reducer side也保證有排序,進而達到了全局排序的效果

大緻流程圖如下:

spark 排序實作原理 RangePartitioner

此時引發一個問題:我相信很多小夥伴在面試的時候都遇到過這個問題:給定1T的檔案和隻有1G記憶體的機器,如何實作全排序?

1,産生随機數,生成10個測試檔案,10個線程同時進行。

2,将大檔案分割1BM的小檔案,每個線程對分割而成的内容進行内部排序後,寫入檔案,利用自定義阻塞線程池,每次同時寫入3~4個檔案。

3,将所有小檔案排序後,利用多路排序算法将小檔案寫入最終檔案。

如果可以的話是否也可采用類似spark排序的的原理呢?歡迎交流!

繼續閱讀