天天看點

第37課:Task執行内幕與結果處了解密

Task執行原理流程圖

第37課:Task執行内幕與結果處了解密

    1.當Driver中的CoarseGrainedSchedulerBackend給CoarseGrainedExecutorBackend發送LaunchTask之後,CoarseGrainedExecutorBackend在收到LaunchTask消息後,首先會反序列化TaskDescription: 

 val taskDesc = ser.deserialize[TaskDescription](data.value)
           

          2.Executor會通過會通過launchTask來執行Task:

 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = 
     taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
           

        3.TaskRunner在ThreadPool來運作具體的Task,在TaskRunner的run方法中首先會通過調用statusUpdate給Driver發資訊彙報自己的狀态說明自己是Running狀态:

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
           

       4.TaskRunner内部會做一些準備工作:例如反序列化Task的依賴:

Task.deserializeWithDependencies(serializedTask)
           

       然後通過網絡來擷取需要的檔案、Jar等;

 updateDependencies(taskFiles, taskJars)
           

      5.然後是反序列Task本身;

 task = ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)
           

     6.調用反序列化後的Task.run方法來執行任務并獲得執行結果

val (value, accumUpdates) = try {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
         }
           

      其中Task的run方法調用的時候會導緻會導緻Task的抽象方法runTask的調用,在Task的runTask内部會調用RDD的iterator()方法,該方法就是我們針對目前Task所對應的Partition進行計算的關鍵之所在,在具體的處理内部會疊代Partition的元素并交給我們自定義的function進行處理!

      對于ShuffleMapTask,首先要對RDD以及其依賴關系進行反序列化:

val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
           

      最終計算的時候會調用RDD的compute方法:

 def compute(split: Partition, context: TaskContext): Iterator[T]
           

       具體計算的時候有具體的RDD,例如MapPartitRDD的compute:

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
      f(context, split.index, firstParent[T].iterator(split, context))
           

      對也ResultTask:

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
        metrics = Some(context.taskMetrics)
        func(context, rdd.iterator(partition, context))
           

     其中的f就是我們在目前的Stage中計算具體Partition的業務邏輯代碼; 

     7.把執行結果序列:

 val valueBytes = resultSer.serialize(value)
           

    并根據大小判斷不同的結果傳回給Driver的方式

    8.CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate來傳輸執行結果,DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,然後交給TaskResultGetter内部通過線程去分别處理Task執行成功和失敗時候的不同情況,然後告訴DAGScheduler任務處理結束的狀況。

補充:

    1.在執行具體Task的業務邏輯前會進行四次反序列:

        a) TaskDescription的反序列化;

        b) 反序列化Task的依賴;

        c) Task的反序列化;

        d) RDD反序列化;

    2.在Spark中AkkaFrameSize是128MB,所有可以廣播非常大的任務;而任務的執行結果可以最大達到1G。

繼續閱讀