天天看点

spark源码分析之任务提交(一)Rdd#collect方法分析

本文分析调用RDD的collect方法后的任务提交流程。RDD的collect方法调用了SprakContext的runJob方法,之后又调用了重载的runJob方法,然后调用DAGSchulder的runJob方法,然后调用submitJob方法。调用顺序如下。

RDD#collect()

SprakContext#runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U)

SparkContext#runJob[T,U:ClassTag](rdd:RDD[T],func:Iterator[T]=>U,partitions: Seq[Int],allowLocal:Boolean)

SparkContext#runJob[T, U: ClassTag](

         rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    allowLocal: Boolean

    )

SparkContext#runJob[T, U: ClassTag](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    allowLocal: Boolean,

    resultHandler: (Int, U) => Unit)

DAGScheduler#runJob[T, U: ClassTag](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    allowLocal: Boolean,

    resultHandler: (Int, U) => Unit,

    properties: Properties)

DAGSchduler#submitJob[T, U](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    allowLocal: Boolean,

    resultHandler: (Int, U) => Unit,

    properties: Properties)

submitJob方法用于将一个Job提交到jobscheduler。submitJob中首先获得Job的最大分区数,确定不会在一个不存在的分区上启动任务(Check to make sure we are not launching a task on a partition thatdoes not exist.)。然后生成当前Job的jobId。然后创建JobWaiter,它用来等待DAGScheduler job的完成,并处理任务完成的结果(An object that waits for a DAGScheduler job to complete. As tasksfinish, it passes their results to the given handler function.),代码如表1,其中resultHandler是runJob中传递过来的函数,闭包了一个数组类型的results。然后向eventProcessLoop发送JobSubmitted事件,eventProcessLoop的类型是DAGSchedulerEventProcessLoop,该类继承了EventLoop,EventLoop用来接收事件event并在专门的event线程处理所有的event事件,内部有一个LinkedBlockingDeque类型的eventQueue用于存放event和一个eventThread用于处理event,即从eventQueue中获取事件,调用onReceive方法处理事件,onReceive()方法由DAGSchedulerEventProcessLoop实现,EventProcessLoop和DAGSchedulerEventProcessLoop#onReceive()的实现如表2和表3。最后返回JobWaiter。

private[spark] class JobWaiter[T](

    dagScheduler: DAGScheduler,

    val jobId: Int,

    totalTasks: Int,

    resultHandler: (Int, T) => Unit)

  extends JobListener {

  private var finishedTasks = 0

  // Is the job as a whole finished (succeeded or failed)?

  @volatile

  private var _jobFinished = totalTasks == 0

  def jobFinished = _jobFinished

  // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero

  // partition RDDs), we set the jobResult directly to JobSucceeded.

  private var jobResult: JobResult = if (jobFinished) JobSucceeded else null

  def cancel() {

    dagScheduler.cancelJob(jobId)

  }

//Job成功完成,在同步代码块内调用notifyAll方法,通知awaitResult方法内的wait()继续执行。resultHandler负责处理result。在这里用到了闭包,闭包了一个数组类型的results用于存放任务结果。

  override def taskSucceeded(index: Int, result: Any): Unit = synchronized {

    if (_jobFinished) {

      throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")

    }

    resultHandler(index, result.asInstanceOf[T])

    finishedTasks += 1

    if (finishedTasks == totalTasks) {

      _jobFinished = true

      jobResult = JobSucceeded

      this.notifyAll()

    }

  }

//Job失败在同步代码块内调用notifyAll方法,通知awaitResult方法内的wait()继续执行。

  override def jobFailed(exception: Exception): Unit = synchronized {

    _jobFinished = true

    jobResult = JobFailed(exception)

    this.notifyAll()

  }

//如果Job还没有完成,在synchronized同步代码块内调用wait方法等待。

  def awaitResult(): JobResult = synchronized {

    while (!_jobFinished) {

      this.wait()

    }

    return jobResult

  }

}

表 1JobWaiter实现

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() //java concurrent包下的阻塞队列,用于存放event事件

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {

    setDaemon(true)

    override def run(): Unit = {

      try {

        while (!stopped.get) {

          val event = eventQueue.take()

          try {

            onReceive(event)

          } catch {

            case NonFatal(e) => {

              try {

                onError(e)

              } catch {

                case NonFatal(e) => logError("Unexpected error in " + name, e)

              }

            }

          }

        }

      } catch {

        case ie: InterruptedException => // exit even if eventQueue is not empty

        case NonFatal(e) => logError("Unexpected error in " + name, e)

      }

    }

  }

  def start(): Unit = {

    if (stopped.get) {

      throw new IllegalStateException(name + " has already been stopped")

    }

    // Call onStart before starting the event thread to make sure it happens before onReceive

    onStart()

    eventThread.start()

  }

  def stop(): Unit = {

    if (stopped.compareAndSet(false, true)) {

      eventThread.interrupt()

      var onStopCalled = false

      try {

        eventThread.join()

        // Call onStop after the event thread exits to make sure onReceive happens before onStop

        onStopCalled = true

        onStop()

      } catch {

        case ie: InterruptedException =>

          Thread.currentThread().interrupt()

          if (!onStopCalled) {

            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since

            // it's already called.

            onStop()

          }

      }

    } else {

      // Keep quiet to allow calling `stop` multiple times.

    }

  }

  def post(event: E): Unit = {

    eventQueue.put(event)

  }

  def isActive: Boolean = eventThread.isAlive

  protected def onStart(): Unit = {}

  protected def onStop(): Unit = {}

  protected def onReceive(event: E): Unit

  protected def onError(e: Throwable): Unit

}

表 2EventProcessLoop的实现

override def onReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,

      listener, properties)

  case StageCancelled(stageId) =>

    dagScheduler.handleStageCancellation(stageId)

  case JobCancelled(jobId) =>

    dagScheduler.handleJobCancellation(jobId)

  case JobGroupCancelled(groupId) =>

    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>

    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>

    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId) =>

    dagScheduler.handleExecutorLost(execId, fetchFailed = false)

  case BeginEvent(task, taskInfo) =>

    dagScheduler.handleBeginEvent(task, taskInfo)

  case GettingResultEvent(taskInfo) =>

    dagScheduler.handleGetTaskResult(taskInfo)

  case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason) =>

    dagScheduler.handleTaskSetFailed(taskSet, reason)

  case ResubmitFailedStages =>

    dagScheduler.resubmitFailedStages()

}

表 3 DAGSchedulerEventProcessLoop#onReceive()的实现

DAGSchedulerEventProcessLoop中的eventThread处理到JobSubmitted事件,会调用DagScheduler的handleJobSubmitted方法,首先创建finalStage,并对stage进行划分。然后创建ActiveJob并更新Job相关信息。然后向listenerBus发送SparkListenerJobStart事件,提交finalStage。最后提交等待中的Stage。

继续阅读