Spark分區器HashPartitioner和RangePartitioner代碼詳解
分區器
總覽圖
分類如下:
-
下的org.apache.spark
和HashPartitioner
RangePartitioner
-
下的org.apache.spark.scheduler
CoalescedPartitioner
-
下的org.apache.spark.sql.execution
CoalescedPartitioner
-
下的org.apache.spark.mllib.linalg.distributed
GridPartitioner
-
下的org.apache.spark.sql.execution
PartitionIdPassthrough
-
下的org.apache.spark.api.python
PythonPartitioner
一共7個分區器,重點講解
org.apache.spark
下的
HashPartitioner
和
RangePartitioner
。
分區器隻針對(K,V)形式的RDD操作。
Partitioner
Partitioner
為抽象類,定義了分區器應該具備的
member
:
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
- numPartitions:擷取分區個數。
- getPartition:根據Key值得到分區ID。
Partitioner
類還有一個伴身對象,是預設提供一個分區器。
object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > ))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
}
defaultPartitioner
方法中詳細說明了預設分區器生成政策,要麼是父類
RDDs
的中分區個數最大的
RDD
的分區器,如果父類
RDDs
沒有分區器(非
PairRDD
),那麼傳回的就是
HashPartitioner
,分區器個數有2種方式得到,如果設定了
spark.default.parallelism
,就以該值為分區個數,如果沒設定該值,以父類分區數最大的為準。
HashPartitioner
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= , s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null =>
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
相對簡單,分區ID号用
key
的
hashCode
值與分區個數取餘,如果餘數小于0,則用餘數+分區的個數,最後傳回的值就是這個key所屬的分區ID。
RangePartitioner
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= , s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= ) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min( * partitions, )
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil( * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, L), )
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - )
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = ( / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
具體解釋不多說,參考文章已經有了。其中比較重要的是分界函數,分界函數采用了水塘抽樣算法,一種不知道樣本總量的進行抽樣的算法。
如果你隻是想了解分區的政策,可以直接分析
spark1.1
代碼,1.1之後的代碼是優化了性能(利用水塘抽樣算法減少全局周遊次數),但是政策基本相同。都是抽樣獲得各個partition之間的邊界值,每一個key進來以後,判定在哪個邊界範圍内,存到該分區中。由此可知,分區之間是有序的,就是分區A中的資料任意一個都小于分區B中的資料,但是分區内的資料沒有序。