根据之前的经验,源码阅读大致可分为:
1、Spark任务调度 -- 每个任务都会用到
2、具体的task执行 -- 涉及具体的算法
先从大框架——调度开始。
一、大流程
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5yM1I2MzETYiZDZ0ADMhF2MmZ2Y5YjY2IGN4YjMyIzNw8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
二、基本角色(自己理解)
机器角色(jps能够看到的Java进程):Master、Driver、Executor、Task、Work
程序内部角色:Application、JOB、Stage、DAGscheduler、TaskScheduler
三、Spark Pi的示例
大致经过的处理流程:
1.SparkEnv 注册环境变量等
2.BlockManagerMaster 注册BlockManagerId,UI
3.SparkContext runJob
4.DAGScheduler onReceive->handleJobSubmitted->submitStage->submitMissingTasks
5.DAGScheduler(1388)广播到SparkContext
6.SparkContext broadcast
7.TaskSchedulerImpl submitTasks
8.TaskSetManager->Executor
9.DAGScheduler
调度总览
四、DAGScheduler
DAGScheduler随着SparkContext的创建而创建,而SparkContext类的RunJob函数是所有程序运行的入口。
https://ask.qcloudimg.com/http-save/yehe-1154259/5uvvw15fsk.png?imageView2/2/w/1620
任务接收(EventLoop)
客户端提交(RunJob)
Job切分及Stage划分
1.SparkContext.runJob
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
2.dagScheduler.runJob
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
3.dagScheduler.submitJob
// 启动一个Job等待者,将生成的jobid和分区数量,等作为参数,并且将这个时间放入事件池中,返回等待结果
// JobWaiter监听Job的执行状态,在队列里面
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
// 交给事件监听者处理
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
4.EventLoop.post
// 放入处理队列,子类需要有onReceive方法处理这些event
/**
* Put the event into the event queue. The event thread will process it later.
*/
def post(event: E): Unit = {
if (!stopped.get) {
if (eventThread.isAlive) {
eventQueue.put(event)
} else {
onError(new IllegalStateException(s"$name has already been stopped accidentally."))
}
}
}
DAGSchedulerEventProcessLoop继承自EventLoop[DAGSchedulerEvent]
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
5.DAGSchedulerEventProcessLoop.onReceive
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
6.DagScheduler.handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// 生成 ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)
}
创建createResultStage,并new了一个Job
DagScheduler.createResultStage
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
// 获取当前stage的所有父stage
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
getShuffleDependenciesAndResourceProfiles
/**
* Returns shuffle dependencies that are immediate parents of the given RDD and the
* ResourceProfiles associated with the RDDs for this stage.
*
* This function will not return more distant ancestors for shuffle dependencies. For example,
* if C has a shuffle dependency on B which has a shuffle dependency on A:
*
* A <-- B <-- C
*
* calling this function with rdd C will only return the B <-- C dependency.
*
* This function is scheduler-visible for the purpose of unit testing.
*/
private[scheduler] def getShuffleDependenciesAndResourceProfiles(
rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val resourceProfiles = new HashSet[ResourceProfile]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.prepend(dependency.rdd)
}
}
}
(parents, resourceProfiles)
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
通过listenerBus.post对Job进行提交
五、Job提交整体时序图
下一步,Job stage的划分