Task執行原理流程圖
1.當Driver中的CoarseGrainedSchedulerBackend給CoarseGrainedExecutorBackend發送LaunchTask之後,CoarseGrainedExecutorBackend在收到LaunchTask消息後,首先會反序列化TaskDescription:
val taskDesc = ser.deserialize[TaskDescription](data.value)
2.Executor會通過會通過launchTask來執行Task:
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber =
taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
3.TaskRunner在ThreadPool來運作具體的Task,在TaskRunner的run方法中首先會通過調用statusUpdate給Driver發資訊彙報自己的狀态說明自己是Running狀态:
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
4.TaskRunner内部會做一些準備工作:例如反序列化Task的依賴:
Task.deserializeWithDependencies(serializedTask)
然後通過網絡來擷取需要的檔案、Jar等;
updateDependencies(taskFiles, taskJars)
5.然後是反序列Task本身;
task = ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)
6.調用反序列化後的Task.run方法來執行任務并獲得執行結果
val (value, accumUpdates) = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}
其中Task的run方法調用的時候會導緻會導緻Task的抽象方法runTask的調用,在Task的runTask内部會調用RDD的iterator()方法,該方法就是我們針對目前Task所對應的Partition進行計算的關鍵之所在,在具體的處理内部會疊代Partition的元素并交給我們自定義的function進行處理!
對于ShuffleMapTask,首先要對RDD以及其依賴關系進行反序列化:
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
最終計算的時候會調用RDD的compute方法:
def compute(split: Partition, context: TaskContext): Iterator[T]
具體計算的時候有具體的RDD,例如MapPartitRDD的compute:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
對也ResultTask:
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
其中的f就是我們在目前的Stage中計算具體Partition的業務邏輯代碼;
7.把執行結果序列:
val valueBytes = resultSer.serialize(value)
并根據大小判斷不同的結果傳回給Driver的方式
8.CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate來傳輸執行結果,DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,然後交給TaskResultGetter内部通過線程去分别處理Task執行成功和失敗時候的不同情況,然後告訴DAGScheduler任務處理結束的狀況。
補充:
1.在執行具體Task的業務邏輯前會進行四次反序列:
a) TaskDescription的反序列化;
b) 反序列化Task的依賴;
c) Task的反序列化;
d) RDD反序列化;
2.在Spark中AkkaFrameSize是128MB,所有可以廣播非常大的任務;而任務的執行結果可以最大達到1G。