目錄
一、窄依賴解析
二、寬依賴解析
三、DAG生成的機制
四、DAG邏輯視圖解析
RDD依賴關系為成兩種:窄依賴(Narrow Dependency)、寬依賴(Shuffle Dependency)。窄依賴表示每個父RDD中的Partition最多被子RDD的一個Partition所使用;寬依賴表示一個父RDD的Partition都會被多個子RDD的Partition所使用。
一、窄依賴解析
RDD的窄依賴(Narrow Dependency)是RDD中最常見的依賴關系,用來表示每一個父RDD中的Partition最多被子RDD的一個Partition所使用,如下圖所示,父RDD有2~3個Partition,每一個分區都隻對應子RDD的一個Partition(join with inputs co-partitioned:對資料進行基于相同Key的數值相加)。
窄依賴分為兩類:第一類是一對一的依賴關系,在Spark中用OneToOneDependency來表示父RDD與子RDD的依賴關系是一對一的依賴關系,如map、filter、join with inputs co-partitioned;第二類是範圍依賴關系,在Spark中用RangeDependency表示,表示父RDD與子RDD的一對一的範圍内依賴關系,如union。OneToOneDependency依賴關系的Dependency.scala的源碼如下。
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
OneToOneDependency的getParents重寫方法引入了參數partitionId,而在具體的方法中也使用了這個參數,這表明子RDD在使用getParents方法的時候,查詢的是相同partitionId的内容。也就是說,子RDD僅僅依賴父RDD中相同partitionID的Partition。
Spark窄依賴中第二種依賴關系是RangeDependency。Dependency.scala的RangeDependency的源碼如下。
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
RangeDependency和OneToOneDependency最大的差別是實作方法中出現了outStart、length、instart,子RDD在通過getParents方法查詢對應的Partition時,會根據這個partitionId減去插入時的開始ID,再加上它在父RDD中的位置ID,換而言之,就是将父RDD中的Partition,根據partitionId的順序依次插入到子RDD中。
分析完Spark中的源碼,下邊通過兩個例子來講解從執行個體角度去看RDD窄依賴輸出的結果。對于OneToOneDependency,采用map操作進行實驗,實驗代碼和結果如下所示。
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate()
val sc = sparkSession.sparkContext
sc.setLogLevel("WARN")
// val people = sparkSession.read.parquet("...").as[Person]
val num = Array(100,80,70)
val rddnum1 = sc.parallelize(num)
val mapRdd = rddnum1.map(_*2)
mapRdd.collect().foreach(println)
二、寬依賴解析
RDD的寬依賴(Shuffle Dependency)是一種會導緻計算時産生Shuffle操作的RDD操作,用來表示一個父RDD的Partition都會被多個子RDD的Partition使用,如下圖中groupByKey算子操作所示,父RDD有3個Partition,每個Partition中的資料會被子RDD中的兩個Partition使用。
寬依賴的源碼位于Dependency.scala檔案的ShuffleDependency方法中,newShuffleId()産生了新的shuffleId,表明寬依賴過程需要涉及shuffle操作,後續的代碼表示寬依賴進行時的shuffle操作需要向shuffleManager注冊資訊。Dependency.scala的ShuffleDependency的源碼如下。
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
// 注意:如果在PairRDDFunctions方法中使用combineByKeyWithClassTag,combiner類标簽可能為空
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
Spark中寬依賴關系非常常見,其中較經典的操作為GroupByKey(将輸入的key-value類型的資料進行分組,對相同key的value值進行合并,生成一個tuple2),具體代碼和操作結果如下所示。輸入5個tuple2類型的資料,通過運作産生3個tuple2資料。
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate()
val sc = sparkSession.sparkContext
sc.setLogLevel("WARN")
val data = Array(Tuple2("spark",100),Tuple2("spark",95),Tuple2("hadoop",99),Tuple2("hadoop",80),Tuple2("scala",75))
val rdd = sc.parallelize(data)
val rddGroup = rdd.groupByKey()
rddGroup.collect().foreach(println)
三、DAG生成的機制
在圖論中,如果一個有向圖無法從任意頂點出發經過若幹條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。而在Spark中,由于計算過程很多時候會有先後順序,受制于某些任務必須比另一些任務較早執行的限制,我們必須對任務進行排隊,形成一個隊列的任務集合,這個隊列的任務集合就是DAG圖,每一個定點就是一個任務,每一條邊代表一種限制限制(Spark中的依賴關系)。
通過DAG,Spark可以對計算的流程進行優化,對于資料處理,可以将在單一節點上進行的計算操作進行合并,并且計算中間資料通過記憶體進行高效讀寫,對于資料處理,需要涉及Shuffle操作的步驟劃分Stage,進而使計算資源的利用更加高效和合理,減少計算資源的等待過程,減少計算中間資料讀寫産生的時間浪費(基于記憶體的高效讀寫)。
Spark中DAG生成過程的重點是對Stage的劃分,其劃分的依據是RDD的依賴關系,對于不同的依賴關系,高層排程器會進行不同的處理。對于窄依賴,RDD之間的資料不需要進行Shuffle,多個資料處理可以在同一台機器的記憶體中完成,是以窄依賴在Spark中被劃分為同一個Stage;對于寬依賴,由于Shuffle的存在,必須等到父RDD的Shuffle處理完成後,才能開始接下來的計算,是以會在此處進行Stage的切分。
在Spark中,DAG生成的流程關鍵在于回溯,在程式送出後,高層排程器将所有的RDD看成是一個Stage,然後對此Stage進行從後往前的回溯,遇到Shuffle就斷開,遇到窄依賴,則歸并到同一個Stage。等到所有的步驟回溯完成,便生成一個DAG圖。
DAG生成的相關源碼位于Spark的DAGScheduler.scala。getParentStages擷取或建立一個給定RDD的父Stages清單,getParentStages調用了getShuffleMapStage,,getShuffleMapStage調用了getAncestorShuffleDependencies,getAncestorShuffleDependencies傳回給定RDD的父節點中直接的Shuffle依賴。DAGScheduler.scala的getParentStages的源碼如下。
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
DAGScheduler.scala的getShuffleMapStage的源碼如下。
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleToMapStage.contains(dep.shuffleId)) {
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
DAGScheduler.scala的getAncestorShuffleDependencies的源碼如下。
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
case _ =>
}
waitingForVisit.push(dep.rdd)
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
}
四、DAG邏輯視圖解析
下面通過一個簡單計數案例講解DAG具體的生成流程和關系。示例代碼如下。
val conf = new SparkConf()
conf.setAppName("My first spark app").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines = sc.textFile("./src/test3/words.txt")
// 操作一 通過flatmap形成新的MapPartitionRDD
val words = lines.flatMap(lines=>lines.split(" "))
// 操作二 通過map形成新的MapPartitionRDD
val pairs = words.map(word=>(word,1))
// 操作三 reduceByKey(包含兩步reduce)
// 此步驟生成MapPartitionRDD和ShuffleRDD
val WordCounts = pairs.reduceByKey(_+_)
WordCounts.collect().foreach(println)
println(pairs.toDebugString) // 通過toDebugString檢視RDD的譜系
println("====================================================")
println(WordCounts.toDebugString)
println("====================================================")
sc.stop()
具體解釋為:在程式正式運作前,Spark的DAG排程器會将整個流程設定為一個Stage,此Stage包含3個操作,5個RDD,分别為MapPartitionRDD(讀取檔案資料時)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)、ShuffleRDD(reduceByKeyshuffle操作)。
(1)回溯整個流程,在shuffleRDD與MapPartitionRDD(reduceByKey的local段的操作)中存在shuffle操作,整個RDD先在此切開,形成兩個Stage。
(2)繼續向前回溯,MapPartitionRDD(reduceByKey的local段的操作)與MapPartitionRDD (map操作)中間不存在Shuffle(即兩個RDD的依賴關系為窄依賴),歸為同一個Stage。
(3)繼續回溯,發現往前的所有的RDD之間都不存在Shuffle,應歸為同一個Stage。
(4)回溯完成,形成DAG,由兩個Stage構成:
第一個Stage由MapPartitionRDD(讀取檔案資料時)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)構成。第二個Stage由ShuffleRDD(reduceByKey Shuffle操作)構成。