天天看點

Spark源碼解讀由淺入深 寬窄依賴篇

文章目錄

    • 第一部分 寬窄依賴篇
      • 1.依賴關系概述
      • 2.依賴分類
        • 2.1` Dependency `繼承 Serializable
        • 2.2 `NarrowDependency `窄依賴,繼承 Dependency
        • 2.3` OneToOneDependency` 一對一依賴,繼承NarrowDependency
        • 2.4 `RangeDependency` 範圍依賴,繼承NarrowDependency
        • 2.5 ShuffleDependency 寬依賴,繼承Dependency

第一部分 寬窄依賴篇

1.依賴關系概述

1.1 依賴關系、血緣關系: 在 Spark 中,RDD 分區的資料不支援修改,是隻讀的。如果想更新 RDD 分區中的資料,那麼隻能對原有 RDD 進行轉化操作,也就是在原來 RDD 基礎上建立一個新的RDD。新的RDD依賴于舊的RDD,相領的兩個RDD的關系成為依賴關系,多個連續的RDD的依賴關系,稱為

血緣關系

1.2 血緣關系儲存: 在計算的過程當中,RDD不會儲存資料,為了提高容錯性,RDD将自己與之前舊RDD

血緣關系儲存下來

,當計算出現錯誤時,可以根據血緣關系将

資料源重新讀取

進行計算。

1.3 檢視方法: 我們可以調用RDD的相關方法檢視對應的

依賴

血緣

關系

rdd.todebugstring:列印血緣關系
    rdd.dependencies:列印依賴關系
           

2.依賴分類

檢視Spark源碼中的

Dependency.scala

檔案,其中包含了

Dependency,NarrowDependency、OneToOneDependency、RangeDependency、ShuffleDependency.

總體而言,Spark的依賴關系可以分為NarrowDependency

窄依賴

和ShuffleDependenc

寬依賴

,因為其中OneToOneDependency和RangeDependency都繼承了窄依賴。

我們可以這樣認為:

(1)窄依賴:每個parent RDD 的 partition 最多被 child RDD 的

一個

partition 使用。

(2)寬依賴:每個parent RDD partition 被

多個

child RDD 的partition 使用。

2.1

Dependency

繼承 Serializable

源碼如下,可以看出,

Dependency

隻是依賴的一個基類,繼承了可序列化接口,并且是一個抽象類。

/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}
           

2.2

NarrowDependency

窄依賴,繼承 Dependency

相關算子:filter、map、flatMap、sample、union、intersection、mapPartitions、mapPartitionsWithIndex、coalesce、zip

NarrowDependency

也是一個抽象類,在官方文檔中 NarrowDependency 的描述為:

/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
           

可以了解為:

NarrowDependency

窄依賴是一個子RDD的每個分區依賴于父 RDD 的少量分區的依賴關系的基類。窄依賴關系允許任務如同一條流水線執行,不用等待其他分區任務執行完畢。

源碼如下:

定義了抽象方法getParents用來擷取子分區的父分區, partitionId 表示子 RDD 的一個分區,方法傳回子分區所依賴的父 RDD 的分區。

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}
           

窄依賴的具體實作有

OneToOneDependency

RangeDependency

2.3

OneToOneDependency

一對一依賴,繼承NarrowDependency

官方文檔中 OneToOneDependency 描述為:

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
           

可以了解為:父RDD和子RDD的分區之間隻有一對一的依賴關系,并且 child RDD 的分區數和 parent RDD 的分區數相同。即父RDD的partiton最多被下遊子RDD的一個partition使用,可以想象為獨生子女。這種依賴關系對應的轉換算子有map()、flatMap()、filter()等。

對應源碼如下:

該部分重寫了Dependency的getParents方法,partitionId表示子 RDD 的一個分區。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
           

2.4

RangeDependency

範圍依賴,繼承NarrowDependency

官方文檔中 RangeDependency 描述為:

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 */
           

可以了解為:父 RDD 和子 RDD 中有分區範圍之間的一對一依賴關系。

源碼如下:rdd 表示父 RDD,inStart 表示父 RDD 中範圍的開始,outStart 表示子 RDD 中範圍的開始,length 表示範圍的長度

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}
           

2.5 ShuffleDependency 寬依賴,繼承Dependency

sortBy、sortByKey、reduceByKey、join、leftOuterJoin、rightOuterJoin、fullOuterJoin、distinct、cogroup、repartition、groupByKey

官方文檔中 ShuffleDependency 描述為:

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 */
           

可以了解為:ShuffleDependency表示對 shuffle 階段的輸出的依賴。在一般的shuffle 階段都存在打亂重組這些過程,是以每個分區之間的任務執行是互相影響的,可以把每一個RDD的任務都看作一個階段,同一個父RDD階段的所有分區的任務都完成之後,才可以繼續執行子RDD的任務。請注意,在 shuffle 的情況下,RDD 是瞬态的,是以我們在 executor 端不需要它。

源碼如下:

由于該部分源碼過長,并且涉及到

shuffle

的知識,我分為幾部分來解讀。

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
           

_rdd

表示父RDD,

partitioner

用于對 shuffle 輸出進行分區。

serializer

如果未明确設定,則将使用由

spark.serializer

配置選項指定的預設序列化程式。

keyOrdering

用于 RDD 的 shuffle 的鍵排序,

aggregator

用于 RDD 的 shuffle 的 mapreduce-side 聚合器,

mapSideCombine

表示是否執行部分聚合(也稱為 map-side combine)。

從這裡開始後面都是shuffleWriterProcessor,它是在 ShuffleMapTask 中控制寫入行為的處理器。

val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] with Logging {
           

以下源碼表示,

mapSideCombine

預設為false,如果開啟,需要指定aggregator聚合器,即用于 RDD 的 shuffle 的 mapreduce-side 聚合器。

if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
           

以下源碼表示,shuffle都是基于PairRDD進行的,是以傳入的RDD要是

key-value

類型的,于是ShuffleDependency重寫了父類Dependency的rdd方法,把父類的RDD即_rdd轉化為

[RDD[Product2[K, V]]

的對象執行個體後傳回。

override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
           

以下源碼表示,擷取shuffleId,向shuffleManager

注冊shuffle

資訊。

private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()//擷取shuffleId

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)//向shuffleManager注冊shuffle資訊
           

以下源碼表示,通過後面的

canShuffleMergeBeEnabled()

方法,判斷是否允許 shuffle合并,預設情況下,如果啟用了基于推送的 shuffle,則 ShuffleDependency 允許 shuffle 合并。

private[this] val numPartitions = rdd.partitions.length

  private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()

  private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = {	
    _shuffleMergeAllowed = shuffleMergeAllowed
  }

  def shuffleMergeEnabled : Boolean = shuffleMergeAllowed && mergerLocs.nonEmpty

  def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
           

以下源碼表示,

存儲

所選外部

shuffle

服務清單的位置,以在此 shuffle map 階段處理來自映射器的 shuffle

合并請求

private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
           

以下源碼表示,

存儲

有關與此 shuffle 依賴項關聯的 shuffle map 階段的 shuffle

合并是否已完成

的資訊

private[this] var _shuffleMergeFinalized: Boolean = false
           

shuffleMergeId 用于唯一辨別一次不确定的stage嘗試shuffle的合并過程

private[this] var _shuffleMergeId: Int = 0

  def shuffleMergeId: Int = _shuffleMergeId

  def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
    assert(shuffleMergeAllowed)
    this.mergerLocs = mergerLocs
  }

  def getMergerLocs: Seq[BlockManagerId] = mergerLocs

  private[spark] def markShuffleMergeFinalized(): Unit = {
    _shuffleMergeFinalized = true
  }

  private[spark] def isShuffleMergeFinalizedMarked: Boolean = {
    _shuffleMergeFinalized
  }
           

以下源碼表示,如果基于推送的 shuffle

被禁用

或者此 shuffle 的 shuffle

合并已完成

,則傳回

true

def shuffleMergeFinalized: Boolean = {
    if (shuffleMergeEnabled) {
      isShuffleMergeFinalizedMarked
    } else {
      true
    }
  }
           

新的

shuffle

合并階段

def newShuffleMergeState(): Unit = {
    _shuffleMergeFinalized = false
    mergerLocs = Nil
    _shuffleMergeId += 1
    finalizeTask = None
    shufflePushCompleted.clear()
  }
           

方法

canShuffleMergeBeEnabled()

:用于前面判斷是否允許 shuffle合并

private def canShuffleMergeBeEnabled(): Boolean = {
    val isPushShuffleEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf,
      // invoked at driver
      isDriver = true)
    if (isPushShuffleEnabled && rdd.isBarrier()) {
      logWarning("Push-based shuffle is currently not supported for barrier stages")
    }
    isPushShuffleEnabled && numPartitions > 0 &&
      // TODO: SPARK-35547: Push based shuffle is currently unsupported for Barrier stages
      !rdd.isBarrier()
  }

  @transient private[this] val shufflePushCompleted = new RoaringBitmap()
           

以下源碼表示,在跟蹤位圖中将給定的

map

任務

标記

為推送完成。使用位圖可確定由于推測或階段重試而多次啟動的同一

map

任務僅計算一次。

mapIndex

表示映射任務索引,傳回完成塊推送的映射任務數

/**
   * Mark a given map task as push completed in the tracking bitmap.
   * Using the bitmap ensures that the same map task launched multiple times due to
   * either speculation or stage retry is only counted once.
   * @param mapIndex Map task index
   * @return number of map tasks with block push completed
   */
  private[spark] def incPushCompleted(mapIndex: Int): Int = {
    shufflePushCompleted.add(mapIndex)
    shufflePushCompleted.getCardinality
  }
           

以下源碼表示,表示僅由

DAGScheduler

用于協調 shuffle 合并完成

@transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None

  private[spark] def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask

  private[spark] def setFinalizeTask(task: ScheduledFuture[_]): Unit = {
    finalizeTask = Option(task)
  }

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
           

從上面的分析,不難看出,在

窄依賴

中子 RDD 的每個分區資料的生成操作都是可以并行執行的,而在

寬依賴

中需要所有父 RDD 的

Shuffle

結果完成後再被執行。

繼續閱讀