天天看點

Spark2.4.0源碼分析之WorldCount 事件循環處理器(三)Spark2.4.0源碼分析之WorldCount 事件循環處理器(三)

Spark2.4.0源碼分析之WorldCount 事件循環處理器(三)

更多資源

時序圖

主要内容描述

  • 了解DAG事件循環處理器處理事件流程

源碼分析

DAGScheduler.submitJob

  • 調用DAGSchedulerEventProcessLoop.post進行JobSubmitted事件送出
/**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
           

DAGSchedulerEventProcessLoop.post

  • DAGSchedulerEventProcessLoop繼承EventLoop[DAGSchedulerEvent]
  • DAGSchedulerEventProcessLoop中沒有定義post函數,就等于調用EventLoop.post()函數
/**
   * Put the event into the event queue. The event thread will process it later.
   */
  def post(event: E): Unit = {
    eventQueue.put(event)
  }
           

EventLoop.start

  • DAGScheduler類的末尾調用eventProcessLoop.start()
  • DAGSchedulerEventProcessLoop中沒有定義start()函數
  • 等于調用EventLoop.start()函數,也就是說DAGScheduler進行執行個體化時,已經調用函數EventLoop.start
  • 調用eventThread.start()函數,觸發線程的run()函數
def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }
           

EventLoop

  • 清單阻塞隊列LinkedBlockingDeque,存放事件
  • 執行個體化後就死循環調用了事件阻塞隊列中的事件,取到事件後調用EventLoop.onReceive()函數,該函數沒有實作,調用子類,即DAGSchedulerEventProcessLoop.onReceive()函數
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

 // Exposed for testing.
  private[spark] val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }
           

DAGSchedulerEventProcessLoop.onReceive()

  • 調用DAGSchedulerEventProcessLoop.doOnReceive()對不同的事件類型進行比對,用相應的事件處理方法進行處理
/**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }           

DAGSchedulerEventProcessLoop.doOnReceive()

  • JobSubmitted事件就調用dagScheduler.handleJobSubmitted()函數進行處理
  • 支援如下事件
可以處理多種事件
).JobSubmitted
).MapStageSubmitted
).StageCancelled
).JobCancelled
).JobGroupCancelled
).AllJobsCancelled
).ExecutorAdded
).ExecutorLost
).WorkerRemoved
).BeginEvent
).SpeculativeTaskSubmitted
).GettingResultEvent
).completion: CompletionEvent
).TaskSetFailed
).ResubmitFailedStages
           
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val workerLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, workerLost)

    case WorkerRemoved(workerId, host, message) =>
      dagScheduler.handleWorkerRemoved(workerId, host, message)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case SpeculativeTaskSubmitted(task) =>
      dagScheduler.handleSpeculativeTaskSubmitted(task)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }
           

end

繼續閱讀