天天看点

Spark源码阅读1 —— Job提交

根据之前的经验,源码阅读大致可分为:

1、Spark任务调度 -- 每个任务都会用到

2、具体的task执行 -- 涉及具体的算法

先从大框架——调度开始。

一、大流程

Spark源码阅读1 —— Job提交
Spark源码阅读1 —— Job提交

二、基本角色(自己理解)

 机器角色(jps能够看到的Java进程):Master、Driver、Executor、Task、Work

程序内部角色:Application、JOB、Stage、DAGscheduler、TaskScheduler

 三、Spark Pi的示例

大致经过的处理流程:

1.SparkEnv 注册环境变量等

2.BlockManagerMaster 注册BlockManagerId,UI

3.SparkContext runJob

4.DAGScheduler onReceive->handleJobSubmitted->submitStage->submitMissingTasks

5.DAGScheduler(1388)广播到SparkContext

6.SparkContext broadcast

7.TaskSchedulerImpl  submitTasks

8.TaskSetManager->Executor

9.DAGScheduler

调度总览

Spark源码阅读1 —— Job提交

四、DAGScheduler 

DAGScheduler随着SparkContext的创建而创建,而SparkContext类的RunJob函数是所有程序运行的入口。

https://ask.qcloudimg.com/http-save/yehe-1154259/5uvvw15fsk.png?imageView2/2/w/1620

任务接收(EventLoop) 

客户端提交(RunJob)

Job切分及Stage划分

1.SparkContext.runJob

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
           

 2.dagScheduler.runJob

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
           

3.dagScheduler.submitJob

// 启动一个Job等待者,将生成的jobid和分区数量,等作为参数,并且将这个时间放入事件池中,返回等待结果
// JobWaiter监听Job的执行状态,在队列里面
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
// 交给事件监听者处理
eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
           

 4.EventLoop.post

// 放入处理队列,子类需要有onReceive方法处理这些event
/**
 * Put the event into the event queue. The event thread will process it later.
 */
def post(event: E): Unit = {
  if (!stopped.get) {
    if (eventThread.isAlive) {
	  eventQueue.put(event)
    } else {
	  onError(new IllegalStateException(s"$name has already been stopped accidentally."))
    }
  }
}
           

DAGSchedulerEventProcessLoop继承自EventLoop[DAGSchedulerEvent]

/**
 * The main event loop of the DAG scheduler.
 */
override def onReceive(event: DAGSchedulerEvent): Unit = {
  val timerContext = timer.time()
  try {
    doOnReceive(event)
  } finally {
    timerContext.stop()
  }
}
           

5.DAGSchedulerEventProcessLoop.onReceive

case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
           

6.DagScheduler.handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          (_: Int, value: Int) => value + 1)

        logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
          s"but only ${e.maxConcurrentTasks} are available. " +
          s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")

        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }

      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    // Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)
    
    // 生成 ActiveJob
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
        Utils.cloneProperties(properties)))
    submitStage(finalStage)
  }
           
创建createResultStage,并new了一个Job
           

DagScheduler.createResultStage

/**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    // 获取当前stage的所有父stage
    val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
    val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd, resourceProfile)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(shuffleDeps, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
      callSite, resourceProfile.id)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
           

getShuffleDependenciesAndResourceProfiles

/**
   * Returns shuffle dependencies that are immediate parents of the given RDD and the
   * ResourceProfiles associated with the RDDs for this stage.
   *
   * This function will not return more distant ancestors for shuffle dependencies. For example,
   * if C has a shuffle dependency on B which has a shuffle dependency on A:
   *
   * A <-- B <-- C
   *
   * calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependenciesAndResourceProfiles(
      rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val resourceProfiles = new HashSet[ResourceProfile]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    (parents, resourceProfiles)
  }
           
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
           

 通过listenerBus.post对Job进行提交

五、Job提交整体时序图

Spark源码阅读1 —— Job提交

下一步,Job stage的划分 

继续阅读