目錄
RDD 分區
分區
分區實作
分區個數
分區内部記錄個數
RDD 分區
分區
先回答第一個問題:
RDD 内部,如何表示并行計算的一個計算單元?
答案是使用分區(Partition)
RDD 内部的資料集合在邏輯上和實體上被劃分成多個小子集合,這樣的每一個子集合我們将其稱為分區。
分區的個數會決定并行計算的粒度。
而每一個分區 數值的計算都是在一個單獨的任務中進行,是以并行任務的個數,也是由 RDD(實際上是一個階段的末 RDD,排程章節會介紹)分區的個數決定的。
我會在 1.2 小節以及第二章中,具體說明分區與并行計算的關系。
在後文中,我會用下圖所示圖形來表示 RDD 以及 RDD 内部的分區,RDD 上方文字表示該 RDD 的類型或者名字,分區顔色為紫紅色表示該 RDD 内資料被緩存到存儲媒體中,藍色表示該 RDD 為普通 RDD。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLu9Wa0lGdyFGUk5WQERkUvw1cu9Wa0lGdyFGUERkUvwVMu9Wa0NWZz9CXzV2Zh1WavwVYpRWZt9CX05WZ052bj9CXlR2bj1SZjJXdvNXLrJXYwN3Lc9WauM3av9mY0l2Zu4WYulWYol2Lc9CX6MHc0RHaiojIsJye.png)
分區實作
分區的源碼實作為
Partition
類。
/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
}
RDD 隻是資料集的抽象,分區内部并不會存儲具體的資料。
Partition
類内包含一個
index
成員,表示該分區在 RDD 内的編号,通過 RDD 編号 + 分區編号可以唯一确定該分區對應的塊編号,利用底層資料存儲層提供的接口,就能從存儲媒體(如:HDFS、Memory)中提取出分區對應的資料。
RDD
抽象類中定義了
_partitions
數組成員和
partitions
方法,
partitions
方法提供給外部開發者調用,用于擷取 RDD 的所有分區。
partitions
方法會調用内部
getPartitions
接口,
RDD
的子類需要自行實作
getPartitions
接口。
@transient private var partitions_ : Array[Partition] = null
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
}
partitions_
}
}
以
map
轉換操作生成
MapPartitionsRDD
類中的
getPartitions
方法為例。
override def getPartitions: Array[Partition] = firstParent[T].partitions
可以看到,
MapPartitionsRDD
的分區實際上與父 RDD 的分區完全一緻,這也符合我們對
map
轉換操作的認知。
分區個數
RDD 分區的一個配置設定原則是:
盡可能使得分區的個數,等于叢集核心數目。
RDD 可以通過建立操作或者轉換操作得到。
轉換操作中:
分區的個數會根據轉換操作對應多個 RDD 之間的依賴關系确定。
窄依賴子 RDD 由父 RDD 分區個數決定。
Shuffle 依賴由子 RDD 分區器決定。
建立操作中:
程式開發者可以手動指定分區的個數。
例如
sc.parallelize (Array(1, 2, 3, 4, 5), 2)
表示建立得到的 RDD 分區個數為 2。
在沒有指定分區個數的情況下,Spark 會根據叢集部署模式,來确定一個分區個數預設值。
分别讨論
parallelize
和
textFile
兩種通過外部資料建立生成RDD的方法。
對于
parallelize
方法,預設情況下,分區的個數會受 Apache Spark 配置參數
spark.default.parallelism
的影響,官方對該參數的解釋是用于控制 Shuffle 過程中預設使用的任務數量,這也符合我們之間對分區個數與任務個數之間關系的了解。
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
無論是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式來運作 Apache Spark,分區的預設個數等于對
spark.default.parallelism
的指定值。
若該值未設定,則 Apache Spark 會根據不同叢集模式的特征,來确定這個值。
對于本地模式,預設分區個數等于本地機器的 CPU 核心總數(或者是使用者通過
local[N]
參數指定配置設定給 Apache Spark 的核心數目,見
LocalBackend
類)。
顯然這樣設定是合理的,因為把每個分區的計算任務傳遞給單個核心執行,能夠保證最大的計算效率。
override def defaultParallelism() =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
若使用 Apache Mesos 作為叢集的資源管理系統,預設分區個數等于 8(對 Apache Mesos 不是很了解,根據這個
TODO
,個人猜測 Apache Spark 暫時還無法擷取 Mesos 叢集的核心總數)(見
MesosSchedulerBackend
類)。
// TODO: query Mesos for number of cores
override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
其他叢集模式(Standalone 或者 Yarn),預設分區個數等于叢集中所有核心數目的總和,或者 2,取兩者中的較大值。(見
CoarseGrainedSchedulerBackend
類)。
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
對于
textFile
方法,預設分區個數等于
min(defaultParallelism, 2)
(見
SparkContext
類),而
default.Parallelism
實際上就是
parallelism
方法的預設分區值。
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
分區内部記錄個數
分區配置設定的另一個配置設定原則是:盡可能使同一 RDD 不同分區内的記錄的數量一緻。
對于轉換操作得到的 RDD,如果是窄依賴,則分區記錄數量依賴于父 RDD 中相同編号分區是如何進行資料配置設定的,如果是 Shuffle 依賴,則分區記錄數量依賴于選擇的分區器,哈希分區器無法保證資料被平均配置設定到各個分區,而範圍分區器則能做到這一點。這部分内容我會在 1.6 小節中讨論。
分區器:https://ihainan.gitbooks.io/spark-source-code/content/section1/partitioner.html
parallelize
方法通過把輸入的數組做一次平均配置設定,嘗試着讓每個分區的記錄個數盡可能大緻相同。(見
ParallelCollectionRDD
類)。
private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map(i => {
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
})
}
seq match {
case r: Range => {
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
}
case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
}
}
}
}
textFile
方法分區内資料的大小則是由 Hadoop API 接口
FileInputFormat.getSplits
方法決定(見
HadoopRDD
類),得到的每一個分片即為 RDD 的一個分區,分片内資料的大小會受檔案大小、檔案是否可分割、HDFS 中塊大小等因素的影響,但總體而言會是比較均衡的配置設定。
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
參考:https://blog.csdn.net/songsehaiyang/article/details/54692228