天天看點

Spark分區器探索(HashPartitioner、RangePartitioner)

以Spark2.X為例,其支援Hash、Range以及自定義分區器。

分區器決定了rdd資料在分布式運算時的分區個數以及資料在shuffle中發往的分區号,而分區的個數決定了reduce的個數;同樣的shuffle過程中若分區器定義或選擇不合适将大大增加資料傾斜的風險。綜上,分區器的重要性不言而喻。

首先要知道

(1)Key-Value類型RDD才有分區器,非Key-Value類型RDD的分區值是None。

(2)每個RDD的分區ID範圍為0~numPartitions-1,其決定資料所屬分區。

Hash分區

對于給定的key,計算其hashCode并對分區個數取餘。如果餘數小于0,則用餘數+分區的個數(否則加0),最後傳回的值就是這個key所屬的分區ID

弊端:

可能導緻每個分區中資料量的不均勻,導緻資料傾斜問題。

源碼如下:

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}
           

Ranger分區

1、将一定範圍内的資料映射到某一分區内,盡量保證資料的均勻分布。

2、分區間的資料是有序的,但分區内的元素不能保證有序的。如分區1内的資料為[1,5)内的整數值且無序,分區2的資料為[5,10)内的整數且無序,但分區1的資料全體小于分區2的資料。

實作過程:

第一步:從整個RDD中抽取樣本資料進行排序得到每個分區的最大key值,而後形成一個Array[KEY],也就是key值範圍rangeBounds。

第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下标。

部分源碼如下:

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
 * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
 *
 * @note The actual number of partitions created by the RangePartitioner might not be the same
 * as the `partitions` parameter, in the case where the number of sampled records is less than
 * the value of `partitions`.
 */
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }
  }
}
           

從上我們知道在擷取rangeBounds時涉及到排序,也就是說該分區器要求RDD中的KEY類型是可以排序的。預設情況下采用的hash分區器。

If any of the RDDs already has a partitioner, choose that one.Otherwise, we use a default HashPartitioner.

而實際上在生産環境中很多情況下key都是不可排序的複雜資料類型,同時由于實際業務需求的多樣性,系統自帶的分區器無法滿足的情況下,自定義分區器就上場了。

Spark自定義分區器簡單示例

繼續閱讀