天天看點

RDD :分區 & perfectRDD 分區

目錄

RDD 分區

分區

分區實作

分區個數

分區内部記錄個數

RDD 分區

分區

先回答第一個問題:

RDD 内部,如何表示并行計算的一個計算單元?

答案是使用分區(Partition)

RDD 内部的資料集合在邏輯上和實體上被劃分成多個小子集合,這樣的每一個子集合我們将其稱為分區。

分區的個數會決定并行計算的粒度。

而每一個分區   數值的計算都是在一個單獨的任務中進行,是以并行任務的個數,也是由 RDD(實際上是一個階段的末 RDD,排程章節會介紹)分區的個數決定的。

我會在 1.2 小節以及第二章中,具體說明分區與并行計算的關系。

在後文中,我會用下圖所示圖形來表示 RDD 以及 RDD 内部的分區,RDD 上方文字表示該 RDD 的類型或者名字,分區顔色為紫紅色表示該 RDD 内資料被緩存到存儲媒體中,藍色表示該 RDD 為普通 RDD。

RDD :分區 & perfectRDD 分區

分區實作

分區的源碼實作為 

Partition

 類。

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index
}
           

RDD 隻是資料集的抽象,分區内部并不會存儲具體的資料。

Partition

 類内包含一個 

index

 成員,表示該分區在 RDD 内的編号,通過 RDD 編号 + 分區編号可以唯一确定該分區對應的塊編号,利用底層資料存儲層提供的接口,就能從存儲媒體(如:HDFS、Memory)中提取出分區對應的資料。

RDD

 抽象類中定義了 

_partitions

 數組成員和 

partitions

 方法,

partitions

 方法提供給外部開發者調用,用于擷取 RDD 的所有分區。

partitions

 方法會調用内部 

getPartitions

 接口,

RDD

 的子類需要自行實作 

getPartitions

 接口。

@transient private var partitions_ : Array[Partition] = null

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getPartitions: Array[Partition]

  /**
   * Get the array of partitions of this RDD, taking into account whether the
   * RDD is checkpointed or not.
   */
  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
      }
      partitions_
    }
  }
           

以 

map

 轉換操作生成 

MapPartitionsRDD

 類中的 

getPartitions

 方法為例。

override def getPartitions: Array[Partition] = firstParent[T].partitions
           

可以看到,

MapPartitionsRDD

 的分區實際上與父 RDD 的分區完全一緻,這也符合我們對 

map

 轉換操作的認知。

分區個數

RDD 分區的一個配置設定原則是:

盡可能使得分區的個數,等于叢集核心數目。

RDD 可以通過建立操作或者轉換操作得到。

轉換操作中:

分區的個數會根據轉換操作對應多個 RDD 之間的依賴關系确定。

窄依賴子 RDD 由父 RDD 分區個數決定。

Shuffle 依賴由子 RDD 分區器決定。

建立操作中:

程式開發者可以手動指定分區的個數。

例如 

sc.parallelize (Array(1, 2, 3, 4, 5), 2)

 表示建立得到的 RDD 分區個數為 2。

在沒有指定分區個數的情況下,Spark 會根據叢集部署模式,來确定一個分區個數預設值。

分别讨論 

parallelize

 和

textFile

 兩種通過外部資料建立生成RDD的方法。

對于 

parallelize

 方法,預設情況下,分區的個數會受 Apache Spark 配置參數 

spark.default.parallelism

 的影響,官方對該參數的解釋是用于控制 Shuffle 過程中預設使用的任務數量,這也符合我們之間對分區個數與任務個數之間關系的了解。

/** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
           

無論是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式來運作 Apache Spark,分區的預設個數等于對 

spark.default.parallelism

 的指定值。

若該值未設定,則 Apache Spark 會根據不同叢集模式的特征,來确定這個值。

對于本地模式,預設分區個數等于本地機器的 CPU 核心總數(或者是使用者通過 

local[N]

 參數指定配置設定給 Apache Spark 的核心數目,見 

LocalBackend

 類)。

顯然這樣設定是合理的,因為把每個分區的計算任務傳遞給單個核心執行,能夠保證最大的計算效率。

override def defaultParallelism() =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)
           

若使用 Apache Mesos 作為叢集的資源管理系統,預設分區個數等于 8(對 Apache Mesos 不是很了解,根據這個 

TODO

,個人猜測 Apache Spark 暫時還無法擷取 Mesos 叢集的核心總數)(見 

MesosSchedulerBackend

 類)。

// TODO: query Mesos for number of cores
  override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
           

其他叢集模式(Standalone 或者 Yarn),預設分區個數等于叢集中所有核心數目的總和,或者 2,取兩者中的較大值。(見 

CoarseGrainedSchedulerBackend

 類)。

override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }
           

對于 

textFile

 方法,預設分區個數等于 

min(defaultParallelism, 2)

(見 

SparkContext

 類),而 

default.Parallelism

 實際上就是 

parallelism

 方法的預設分區值。

/**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }
           

分區内部記錄個數

分區配置設定的另一個配置設定原則是:盡可能使同一 RDD 不同分區内的記錄的數量一緻。

對于轉換操作得到的 RDD,如果是窄依賴,則分區記錄數量依賴于父 RDD 中相同編号分區是如何進行資料配置設定的,如果是 Shuffle 依賴,則分區記錄數量依賴于選擇的分區器,哈希分區器無法保證資料被平均配置設定到各個分區,而範圍分區器則能做到這一點。這部分内容我會在 1.6 小節中讨論。

分區器:https://ihainan.gitbooks.io/spark-source-code/content/section1/partitioner.html

parallelize

 方法通過把輸入的數組做一次平均配置設定,嘗試着讓每個分區的記錄個數盡可能大緻相同。(見 

ParallelCollectionRDD

 類)。

private object ParallelCollectionRDD {
  /**
   * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
   * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
   * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
   * is an inclusive Range, we use inclusive range for the last slice.
   */
  def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of slices required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map(i => {
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      })
    }
    seq match {
      case r: Range => {
        positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }).toSeq.asInstanceOf[Seq[Seq[T]]]
      }
      case nr: NumericRange[_] => {
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      }
      case _ => {
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map({
          case (start, end) =>
            array.slice(start, end).toSeq
        }).toSeq
      }
    }
  }
}
           

textFile

 方法分區内資料的大小則是由 Hadoop API 接口 

FileInputFormat.getSplits

 方法決定(見 

HadoopRDD

 類),得到的每一個分片即為 RDD 的一個分區,分片内資料的大小會受檔案大小、檔案是否可分割、HDFS 中塊大小等因素的影響,但總體而言會是比較均衡的配置設定。

override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    if (inputFormat.isInstanceOf[Configurable]) {
      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
    }
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
  }
           

參考:https://blog.csdn.net/songsehaiyang/article/details/54692228 

RDD