sprak Core中比較常用的排序方法sortBy和sortKByKey,這是一個shuffle類算法子,寬依賴,出發DAGSchedular劃分Stage,那麼他們排序的原理是啥呢?
第一步Stage0:
分區采樣Sample,建立RangePartitioner,先對輸入的資料的key做采樣,來估算Key的分布,然後按照指定的排序切分range,盡量讓每個partition對應的range裡的key分布均勻,計算出一個RangeBounds,長度為partitions - 1的Array[Key],裡面存放的資料為前partition - 1分區中每個分區的key的上界,最後一個partition就是存放剩餘的其他資料
此時的采樣大小:
//此處确定采樣大小,前提假設資料分區大緻平衡,即不存在傾斜的情況
// 總體采樣數量
val sampleSize = math.min(20.0 * partitions, 1e6)
// 每個分區采樣數量
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// 資料采樣
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
// 此方法資料采樣 傳回的資料
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
如果一個分區包含的項遠遠超過了平均數,我們将從中重新采樣以確定從該分區收集足夠的項。
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// 采樣的資料占整體的比例
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// 以期望的采樣機率重新采樣不平衡的分區
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
第二步:shuffle write
開始shuffle在map side做shuffle write,每個計算節點根據前面計算出來的rangeBounds對輸入資料重新分片,分片采用rangePartitioner 使得重新分區後的資料在分區之間隻排好序的,
然後此時分區内的資料不一定是已經排好序
第三步:shuffle read
在reducer side,對每個分區内的資料做排序
這樣完成之後,partition之間的資料在map side就保證有序,而每個分區内的資料在reducer side也保證有排序,進而達到了全局排序的效果
大緻流程圖如下:
此時引發一個問題:我相信很多小夥伴在面試的時候都遇到過這個問題:給定1T的檔案和隻有1G記憶體的機器,如何實作全排序?
1,産生随機數,生成10個測試檔案,10個線程同時進行。
2,将大檔案分割1BM的小檔案,每個線程對分割而成的内容進行内部排序後,寫入檔案,利用自定義阻塞線程池,每次同時寫入3~4個檔案。
3,将所有小檔案排序後,利用多路排序算法将小檔案寫入最終檔案。
如果可以的話是否也可采用類似spark排序的的原理呢?歡迎交流!