天天看點

Spark成長之路(4)-分區器系統總覽圖PartitionerHashPartitionerRangePartitioner

Spark分區器HashPartitioner和RangePartitioner代碼詳解

分區器

總覽圖

Spark成長之路(4)-分區器系統總覽圖PartitionerHashPartitionerRangePartitioner

分類如下:

  • org.apache.spark

    下的

    HashPartitioner

    RangePartitioner

  • org.apache.spark.scheduler

    下的

    CoalescedPartitioner

  • org.apache.spark.sql.execution

    下的

    CoalescedPartitioner

  • org.apache.spark.mllib.linalg.distributed

    下的

    GridPartitioner

  • org.apache.spark.sql.execution

    下的

    PartitionIdPassthrough

  • org.apache.spark.api.python

    下的

    PythonPartitioner

一共7個分區器,重點講解

org.apache.spark

下的

HashPartitioner

RangePartitioner

分區器隻針對(K,V)形式的RDD操作。

Partitioner

Partitioner

為抽象類,定義了分區器應該具備的

member

:

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}
           
  • numPartitions:擷取分區個數。
  • getPartition:根據Key值得到分區ID。

Partitioner

類還有一個伴身對象,是預設提供一個分區器。

object Partitioner {
  /**
   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
   *
   * If any of the RDDs already has a partitioner, choose that one.
   *
   * Otherwise, we use a default HashPartitioner. For the number of partitions, if
   * spark.default.parallelism is set, then we'll use the value from SparkContext
   * defaultParallelism, otherwise we'll use the max number of upstream partitions.
   *
   * Unless spark.default.parallelism is set, the number of partitions will be the
   * same as the number of partitions in the largest upstream RDD, as this should
   * be least likely to cause out-of-memory errors.
   *
   * We use two method parameters (rdd, others) to enforce callers passing at least  RDD.
   */
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > ))
    if (hasPartitioner.nonEmpty) {
      hasPartitioner.maxBy(_.partitions.length).partitioner.get
    } else {
      if (rdd.context.conf.contains("spark.default.parallelism")) {
        new HashPartitioner(rdd.context.defaultParallelism)
      } else {
        new HashPartitioner(rdds.map(_.partitions.length).max)
      }
    }
  }
}
           

defaultPartitioner

方法中詳細說明了預設分區器生成政策,要麼是父類

RDDs

的中分區個數最大的

RDD

的分區器,如果父類

RDDs

沒有分區器(非

PairRDD

),那麼傳回的就是

HashPartitioner

,分區器個數有2種方式得到,如果設定了

spark.default.parallelism

,就以該值為分區個數,如果沒設定該值,以父類分區數最大的為準。

HashPartitioner

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= , s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}
           

相對簡單,分區ID号用

key

hashCode

值與分區個數取餘,如果餘數小于0,則用餘數+分區的個數,最後傳回的值就是這個key所屬的分區ID。

RangePartitioner

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= , s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= ) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min( * partitions, )
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil( * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, L), )
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - )
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = ( / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }
           

具體解釋不多說,參考文章已經有了。其中比較重要的是分界函數,分界函數采用了水塘抽樣算法,一種不知道樣本總量的進行抽樣的算法。

如果你隻是想了解分區的政策,可以直接分析

spark1.1

代碼,1.1之後的代碼是優化了性能(利用水塘抽樣算法減少全局周遊次數),但是政策基本相同。都是抽樣獲得各個partition之間的邊界值,每一個key進來以後,判定在哪個邊界範圍内,存到該分區中。由此可知,分區之間是有序的,就是分區A中的資料任意一個都小于分區B中的資料,但是分區内的資料沒有序。