簡介
spark一個最重要的特性就是對資料集在各個節點的分區進行控制。控制資料分布可以減少網絡開銷,極大地提升整體性能。
隻有Pair RDD才有分區,非Pair RDD分區的值是None。如果RDD隻被掃描一次,沒必要預先分區處理;如果RDD多次在諸如連接配接這種基于鍵的操作中使用時,分區才有作用。
分區器
分區器決定了RDD的分區個數及每條資料最終屬于哪個分區。
spark提供了兩個分區器:HashPartitioner和RangePartitioner,它們都繼承于org.apache.spark.Partitioner類并實作三個方法。
- numPartitions: Int: 指定分區數
- getPartition(key: Any): Int: 分區編号(0~numPartitions-1)
- equals(): 檢查分區器對象是否和其他分區器執行個體相同,判斷兩個RDD分區方式是否一樣。
HashPartitioner分區
HashPartitioner分區執行原理:對于給定的key,計算其hashCode,再除以分區數取餘,最後的值就是這個key所屬的分區ID。實作如下:
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
RangePartitioner分區
HashPartitioner分區可能導緻每個分區中資料量的不均勻。而RangePartitioner分區則盡量保證每個分區中資料量的均勻,将一定範圍内的數映射到某一個分區内。分區與分區之間資料是有序的,但分區内的元素是不能保證順序的。
RangePartitioner分區執行原理:
- 計算總體的資料抽樣大小sampleSize,計算規則是:至少每個分區抽取20個資料或者最多1M的資料量。
- 根據sampleSize和分區數量計算每個分區的資料抽樣樣本數量sampleSizePrePartition
- 調用RangePartitioner的sketch函數進行資料抽樣,計算出每個分區的樣本。
- 計算樣本的整體占比以及資料量過多的資料分區,防止資料傾斜。
- 對于資料量比較多的RDD分區調用RDD的sample函數API重新進行資料抽取。
- 将最終的樣本資料通過RangePartitoner的determineBounds函數進行資料排序配置設定,計算出rangeBounds。
class RangePartitioner[K: Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
// 擷取RDD中K類型資料的排序器
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
// 如果給定的分區數小于等于1的情況下,直接傳回一個空的集合,表示資料不進行分區
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// 給定總的資料抽樣大小,最多1M的資料量(10^6),最少20倍的RDD分區數量,也就是每個RDD分區至少抽取20條資料
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
// RDD各分區中的資料量可能會出現傾斜的情況,乘于3的目的就是保證資料量小的分區能夠采樣到足夠的資料,而對于資料量大的分區會進行第二次采樣
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
// 從rdd中抽取資料,傳回值:(總rdd資料量, Array[分區id,目前分區的資料量,目前分區抽取的資料])
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
// 如果總的資料量為0(RDD為空),那麼直接傳回一個空的數組
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
// 計算總樣本數量和總記錄數的占比,占比最大為1.0
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 儲存樣本資料的集合buffer
val candidates = ArrayBuffer.empty[(K, Float)]
// 儲存資料分布不均衡的分區id(資料量超過fraction比率的分區)
val imbalancedPartitions = mutable.Set.empty[Int]
// 計算抽取出來的樣本資料
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
// 如果fraction乘以目前分區中的資料量大于之前計算的每個分區的抽象資料大小,那麼表示目前分區抽取的資料太少了,該分區資料分布不均衡,需要重新抽取
imbalancedPartitions += idx
} else {
// 目前分區不屬于資料分布不均衡的分區,計算占比權重,并添加到candidates集合中
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
// 對于資料分布不均衡的RDD分區,重新進行資料抽樣
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
// 擷取資料分布不均衡的RDD分區,并構成RDD
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
// 随機種子
val seed = byteswap32(-rdd.id - 1)
// 利用rdd的sample抽樣函數API進行資料抽樣
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
// 将最終的抽樣資料計算出rangeBounds出來
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
// 下一個RDD的分區數量是rangeBounds數組中元素數量+ 1個
def numPartitions: Int = rangeBounds.length + 1
// 二分查找器,内部使用java中的Arrays類提供的二分查找方法
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
// 根據RDD的key值傳回對應的分區id。從0開始
def getPartition(key: Any): Int = {
// 強制轉換key類型為RDD中原本的資料類型
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
// 如果分區資料小于等于128個,那麼直接本地循環尋找目前k所屬的分區下标
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
// 如果分區數量大于128個,那麼使用二分查找方法尋找對應k所屬的下标;
// 但是如果k在rangeBounds中沒有出現,實質上傳回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比所有資料都大)
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition - 1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
// 根據資料排序是升序還是降序進行資料的排列,預設為升序
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
影響分區的算子操作
影響分區的算子操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(如果父RDD有分區方式)、flatMapValues()(如果父RDD有分區方式)。
對于執行兩個RDD的算子操作,輸出資料的分區方式取決于父RDD的分區方式。預設情況下,結果會采用哈希分區,分區的數量和操作的并行度一樣。不過,如果其中一個父RDD設定過分區方式,結果就采用那種分區方式;如果兩個父RDD都設定過分區方式,結果RDD采用第一個父RDD的分區方式。
repartition和partitionBy的差別
repartition 和 partitionBy 都是對資料進行重新分區,預設都是使用 HashPartitioner。但是二者之間的差別有:
- partitionBy隻能用于Pair RDD
- 都作用于Pair RDD時,結果也不一樣
其實partitionBy的結果才是我們所預期的。repartition 其實使用了一個随機生成的數來當作 key,而不是使用原來的key。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
}
repartition和coalesce的差別
兩個算子都是對RDD的分區進行重新劃分,repartition隻是coalesce接口中shuffle為true的簡易實作,(假設RDD有N個分區,需要重新劃分成M個分區)
- N<M。一般情況下N個分區有資料分布不均勻的狀況,利用HashPartitioner函數将資料重新分區為M個,這時需要将shuffle設定為true。
- 如果N>M并且N和M相差不多(假如N是1000,M是100),這時可以将shuffle設定為false,不進行shuffle過程,父RDD和子RDD之間是窄依賴關系。在shuffle為false的情況下,如果N<M時,coalesce是無效的。
- 如果N>M并且兩者相差懸殊,這時如果将shuffle設定為false,父子RDD是窄依賴關系,同處在一個Stage中,就可能造成spark程式的并行度不夠,進而影響性能。如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以将shuffle設定為true。
執行個體分析
需求
統計使用者通路其未訂閱主題頁面的情況。
- 使用者資訊表:由(UserID,UserInfo)組成的RDD,UserInfo包含該使用者所訂閱的主題清單。
- 事件表:由(UserID,LinkInfo)組成的RDD,存放着每五分鐘内網站各使用者通路情況。
代碼實作
val sc = new SparkContext()
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist
def processNewLogs(logFileName:String){
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
//RDD of (UserID,(UserInfo,LinkInfo)) pairs
val joined = usersData.join(events)
val offTopicVisits = joined.filter {
// Expand the tuple into its components
case (userId, (userInfo, linkInfo)) =>
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed opics: " + offTopicVisits)
}
缺點
連接配接操作會将兩個資料集中的所有鍵的哈希值都求出來,将哈希值相同的記錄通過網絡傳到同一台機器上,然後再對所有鍵相同的記錄進行連接配接操作。userData表資料量很大,是以這樣進行哈希計算和跨節點資料混洗非常耗時。
改進代碼實作
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
.partionBy(new HashPartiotioner(100))
.persist()