天天看点

【回顾】RDD的依赖关系及任务、阶段的划分

文章目录

  • ​​RDD的依赖关系及任务、阶段的划分​​
  • ​​1、RDD 血统关系​​
  • ​​2、RDD 依赖关系​​
  • ​​3、RDD 窄依赖​​
  • ​​4、RDD 宽依赖​​
  • ​​5、RDD 阶段划分​​
  • ​​6、RDD 任务划分​​

RDD的依赖关系及任务、阶段的划分

1、RDD 血统关系

RDD 只支持​

​粗粒度转换​

​,​

​即在大量记录上执行的单个操作​

​。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。​

​RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区​

​。

【回顾】RDD的依赖关系及任务、阶段的划分
  • RDD不会保存数据。
  • RDD为了提供容错性,需要将RDD间的关系保存下来。一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。
【回顾】RDD的依赖关系及任务、阶段的划分

toDebugString():返回此 RDD 的描述及其用于调试的递归依赖项

/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString: String = {
  // Get a debug description of an rdd without its children
  def debugSelf(rdd: RDD[_]): Seq[String] = {
    import Utils.bytesToString

    val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
    val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info =>
      "    CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
        info.numCachedPartitions, bytesToString(info.memSize),
        bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))

    s"$rdd [$persistence]" +: storageInfo
  }
  ........      

WordCount测试:

val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.toDebugString)
println("----------------------")

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
resultRDD.collect()

(2) data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 []
 |  data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []
----------------------
(2) MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 []
 |  data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 []
 |  data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []
----------------------
(2) MapPartitionsRDD[3] at map at Spark01_toDebugString.scala:18 []
 |  MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 []
 |  data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 []
 |  data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []
----------------------
(2) ShuffledRDD[4] at reduceByKey at Spark01_toDebugString.scala:22 []
 +-(2) MapPartitionsRDD[3] at map at Spark01_toDebugString.scala:18 []
    |  MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 []
    |  data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 []
    |  data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []      

注意: ​

​+-(2)​

​ 表示存在shuffle操作,中断。

​​返回顶部​​

2、RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

dependencies():获取此 RDD 的依赖项列表,同时考虑 RDD 是否已设置检查点。

/**
* Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
  }
}      

WordCount测试:

val fileRDD = sc.textFile("data/t1.txt")
println(fileRDD.dependencies)
println("----------------------")

val wordRDD = fileRDD.flatMap(_.split(" "))
println(wordRDD.dependencies)
println("----------------------")

val mapRDD = wordRDD.map((_,1))
println(mapRDD.dependencies)
println("----------------------")

val resultRDD = mapRDD.reduceByKey(_+_)
println(resultRDD.dependencies)
resultRDD.collect()

List(org.apache.spark.OneToOneDependency@6d469831)
----------------------
List(org.apache.spark.OneToOneDependency@2cc04358)
----------------------
List(org.apache.spark.OneToOneDependency@58516c91)
----------------------
List(org.apache.spark.ShuffleDependency@1907874b)      

​​返回顶部​​

3、RDD 窄依赖

窄依赖表示​

​每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition使用​

​,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)      
【回顾】RDD的依赖关系及任务、阶段的划分

任务的划分:

【回顾】RDD的依赖关系及任务、阶段的划分

窄依赖中下层数据只依赖于上层父级分区的数据,所以只需要各分区间自行计算即可,也就是每个分区开设一个单独的Task按照先后顺序执行就行了。

​​返回顶部​​

4、RDD 宽依赖

宽依赖表示​

​同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖​

​,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   @transient private val _rdd: RDD[_ <: Product2[K, V]],
   val partitioner: Partitioner,
   val serializer: Serializer = SparkEnv.get.serializer,
   val keyOrdering: Option[Ordering[K]] = None,
   val aggregator: Option[Aggregator[K, V, C]] = None,
   val mapSideCombine: Boolean = false)
   extends Dependency[Product2[K, V]]      
【回顾】RDD的依赖关系及任务、阶段的划分

任务的划分:

【回顾】RDD的依赖关系及任务、阶段的划分

宽依赖同一个父级层的数据会被多个下级层分区依赖,所以对于数据的管理需要分别展开,父级层各分区的数据分别交由对应的Task执行操作,并且必须等每个分区全部完成后才能够进行下一步,存在阶段的概念(紫色部分),shuffle操作后再交由新分区对应的Task执行。

​​返回顶部​​

5、RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向、不会闭环。

例如,DAG 记录了 RDD 的转换过程和任务的阶段

【回顾】RDD的依赖关系及任务、阶段的划分

RDD 阶段划分源码:

【回顾】RDD的依赖关系及任务、阶段的划分

我们从collect()算子入手~~~ 一路 ​

​Ctrl + 鼠标点击方法名​

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}      

穿过底层一系列的 ​

​runJob()​

  • // submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job

    ​val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)​

  • // dagScheduler.runJob 通过有向无环图运行job

    ​dagScheduler.runJob()​

  • // submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job

    ​val waiter = submitJob()​

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, 0 until rdd.partitions.length)
}

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
  val results = new Array[U](partitions.size)
  runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
  results
}

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  // dagScheduler.runJob 通过有向无环图运行job
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

 def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
  val start = System.nanoTime
  // submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case scala.util.Failure(exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}      

submitJob底层使用了JobSubmitted事件

​​

​val waiter = new JobWaiter(., ., ., .)eventProcessLoop.post(JobSubmitted(.,.,.,.,.,.,.))​

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}      

handleJobSubmitted()

一路火花带闪电,找到handleJobSubmitted事件处理器

  • // finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 进行阶段的划分

    ​finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)​

private[scheduler] case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent

// finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 进行阶段的划分
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage)
}      

createResultStage()

  • // 判断是否存在上一级阶段,并获取通过上面一系列操作传递过来的collect上一级的rdd

    ​val parents = getOrCreateParentStages(rdd, jobId)​

    ​​

    // 接着创建一个ResultStage

    ​val stage = new ResultStage()​

private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
  // 判断是否存在上一级阶段
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  // 首先创建一个ResultStage
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}      

getOrCreateParentStages()

  • // 获取传输过来RDD的宽依赖(shuffle)

    ​getShuffleDependencies(rdd).map { shuffleDep=>​

    ​ // 将每一个shuffle依赖转为一个阶段

    ​getOrCreateShuffleMapStage(shuffleDep, firstJobId)​

    }.toList

  • createShuffleMapStage()
  • // 创建新的阶段

    ​val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)​

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
   // 获取传输过来RDD的宽依赖(shuffle)
   getShuffleDependencies(rdd).map { shuffleDep =>
     // 将每一个shuffle依赖转为一个阶段
     getOrCreateShuffleMapStage(shuffleDep, firstJobId)
   }.toList
 }

private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  // 创建一个存储宽依赖的HashSet集合
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  // 创建一个访问过RDD的HashSet集合
  val visited = new HashSet[RDD[_]]
  // 创建一个等待访问RDD的Stack
  val waitingForVisit = new Stack[RDD[_]]
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
      visited += toVisit
      // 遍历依赖
      toVisit.dependencies.foreach {
        // 如果是宽依赖,存入宽依赖的集合
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.push(dependency.rdd)
      }
    }
  }
  // 将shuffle依赖返回
  parents
}

private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage

    case None =>
      // Create stages for all missing ancestor shuffle dependencies.
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
        // that were not already in shuffleIdToMapStage, it's possible that by the time we
        // get to a particular dependency in the foreach loop, it's been added to
        // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
        // SPARK-13902 for more information.
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)
        }
      }
      // Finally, create a stage for the given shuffle dependency.
      // 创建shuffle阶段
      createShuffleMapStage(shuffleDep, firstJobId)
  }
}

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  val numTasks = rdd.partitions.length
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  // 创建新的阶段
  val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)

  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // A previously run stage generated partitions for this shuffle, so for each output
    // that's still available, copy information about that output location to the new stage
    // (so we don't unnecessarily re-compute that data).
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    (0 until locs.length).foreach { i =>
      if (locs(i) ne null) {
        // locs(i) will be null if missing
        stage.addOutputLoc(i, locs(i))
      }
    }
  } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}      

​​返回顶部​​

6、RDD 任务划分

RDD 任务切分中间分为:​

​Application​

​、​

​Job​

​、​

​Stage​

​ 和 ​

​Task​

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application -> Job -> Stage -> Task 每一层都是 1 对 n 的关系。

RDD 任务划分源码:

handleJobSubmitted()

  • // 提交阶段 — 最后的阶段

    ​submitStage(finalStage)​

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  // 提交阶段 --- 最后的阶段
  submitStage(finalStage)
}      

submitStage():

  • // 如果没有上一级阶段

    ​submitMissingTasks(stage, jobId.get)​

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    // 判断是否有上一级阶段
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        // 如果没有上一级阶段
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}      

submitMissingTasks():

val tasks: Seq[Task[_]] = try {
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    // ShuffleMapStage 
    case stage: ShuffleMapStage =>
      stage.pendingPartitions.clear()
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        stage.pendingPartitions += id
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
          Option(sc.applicationId), sc.applicationAttemptId)
      }
    // ResultStage 
    case stage: ResultStage =>
      // map算子只改变数据的结构,不会改变数据量,所以新建ResultTask的数量就是partitionsToCompute的数据量
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        // 新建ResultTask
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, properties, serializedTaskMetrics,
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
      }
  }
}      

findMissingPartitions():

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

override def findMissingPartitions(): Seq[Int] = {
  val job = activeJob.get
  //0 - 最后一个RDD的分区数
  (0 until job.numPartitions).filter(id => !job.finished(id))
}      

到了这里也就是说新建的分区数量取决于一个阶段中最后RDD的分区数量

同理,我们查看ShuffleMapStage 的底层:

override def findMissingPartitions(): Seq[Int] = {
  // 依然是 0 - numPartitions(分区数)
  val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
  assert(missing.size == numPartitions - _numAvailableOutputs,
    s"${missing.size} missing, expected ${numPartitions -}")
  missing
}