说明
①此文主要以Yarn Cluster模式为例说明提交流程
②spark版本3.0
提交流程简图

提交流程描述
①在YARN Cluster模式下,任务提交后会创建yarn客户端yarnClient,通过客户端和ResourceManager通讯申请启动ApplicationMaster
②随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster。
③ApplicationMaster主线程会启动一个子线程,命名为driver,然后阻塞主线程,等待SC(SparkContext)创建。其中driver线程负责运行应用程序中的main()方法,启动应用程序。
④应用程序启动后,会创建SC上下文环境(SparkContext),然后将SC返回给主线程,并阻塞driver线程。
⑤主线程收到了SC之后结束阻塞继续运行,向ResourceManager进行注册并申请Executor内存
⑥ResourceManager接到ApplicationMaster的资源申请后会将集群的资源列表全部发送给ApplicationMaster
⑦ApplicationMaster会根据本地化级别挑选合适的资源,并创建NodeManger客户端,然后通过客户端向NodeManger发送用于启动Executor后台进程的命令
⑧ExecutorBackend进程启动后会向driver反向注册,并创建真正的执行对象Executor
至此计算资源准备完毕,接下来开始准备执行任务
⑨计算资源准备完毕,主线程就会唤醒driver线程,继续执行应用程序。
⑩之后执行到Action算子时,触发一个Job,开始划分stage(阶段),每个阶段会根据RDD的分区数生成对应个数的Task,Task先会被存入任务池中,然后被发送到Exector中执行。
源码部分
SparkSubmit
SparkSubmit类作为起点开始解读源码
SparkSubmit的main()方法作为程序入口,启动进程
|--SparkSubmit main()
|--doSubmit()
...进入doSubmit方法...
|--parseArguments(args) //解析参数
|--submit(appArgs, uninitLog) //带着解析参数进行提交
...进入submit方法...
|--runMain(args, uninitLog)
...进入runMain方法...
//准备提交环境。prepareSubmitEnvironment主要作用是根据运行模式返回不同的Application
//yarn模式返回的application是:org.apache.spark.deploy.yarn.YarnClusterApplication
//其中返回值YarnClusterApplication 赋值给了变量childMainClass
|--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// 通过类名加载YarnClusterApplication 这个类
// Class.forName("xxxxx")
|-- mainClass = Utils.classForName(childMainClass)
//判断mainClass是否是SparkApplication的子类,返回值使用变量app接收
|--val app = classOf[SparkApplication].isAssignableFrom(mainClass)..\
...进行条件判断...
//如果是SparkApplication的子类则创建SparkApplication对象
|--mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
//如果不是则创建JavaMainApplication的对象
|--new JavaMainApplication(mainClass)
//由上可知:变量app的实际类型是:YarnClusterApplication
|--app.start
...接下来执行YarnClusterApplication中的start方法
源码中分析到开始执行YarnClusterApplication中的start方法,其中start方法主要做的事就是创建YarnClient(Yarn客户端),然后通过客户端向集群提交应用程序。
ApplicationMaster
集群会将客户端提交的应用委托给ApplicationMaster来完成,所接下来从ApplicationMaster的main方法作为入口继续分析源码。
|--main(...)
//scala语言中创建对象时会对类进行初始化。此处的会获取到客户端提交的应用程序类(如:wordcount)
|--val amArgs = new ApplicationMasterArguments(args)
...初始化ApplicationMasterArguments类...
//初始化成员属性
var userClass: String = null
//执行parseArgs方法,其主要作用是将应用程序类赋值给成员变量userClass
|--parseArgs(args.toList)
|--master = new ApplicationMaster //创建ApplicationMaster对象
|--master.run() //ApplicationMaster中的run()方法
...进入run方法...
|--runDriver()
...进入runDriver方法
//此方法用于启动客户端提交的应用程序(如:wordcount)
|--startUserApplication()
...进入startUserApplication方法...
//通过反射获取到userClass类(应用程序类)中的main方法
|--val mainMethod=userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
|--val userThread = new Thread //创建了一个子线程,
|--userThread.setName("Driver") //设置线程名字为driver,也就是说driver其实是一个线程。
|--userThread.run() //启动线程。此线程专门用于启动wordcount应用程序
...进入run方法...
//调用userClass类中的main方法,正式开始运行wordcount进程
|--mainMethod.invoke(null, userArgs.toArray)
......wordcount代码即我们自己根据需求写的代码,可以说是再熟悉不过了,主要逻辑如下
......创建SC(SparkContext)、创建rdd、转换算子、执行算子
......执行算子中进行阶段划分,创建task,等等一系列的计算任务准备工作
......不过当创建SC后,当前线程会阻塞,直到集群资源准备完毕后,才会被唤醒。
//此方法会阻塞主线程,直到driver线程中SC(SparkContext)创建完成。
|--val sc = ThreadUtils.awaitResult
//主线线程获取到SC之后会结束阻塞,继续执行
|--val rpcEnv = sc.env.rpcEnv //创建rpcEnv通信环境
//通过SC获取配置信息
|--val userConf = sc.getConf
|--val host = userConf.get(DRIVER_HOST_ADDRESS)
|--val port = userConf.get(DRIVER_PORT)
//向ResourceManager注册AM(ApplicationMaster)
|--registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
//注册成功后开始准备创建资源分配器
|--createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
...进入createAllocator方法...
//创建资源分配器。这里的client来自:client = new YarnRMClient()。
|--allocator = client.createAllocator()
...进入createAllocator方法...
//创建yarn的资源分配器,并返回
|--new YarnAllocator
//开始准备资源。
|--allocator.allocateResources()
....进入allocateResources方法...
//1.先获取到集群中所有Container资源,使用list集合存储
|--val allocatedContainers list[Container]=allocateResponse.getAllocatedContainers()
//2.处理分配的资源。
|--handleAllocatedContainers(allocatedContainers.asScala)
//遍历资源列表,从中挑选出最适合计算的资源(根据主机、机架等本地化级别)
|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
....
//资源挑选完毕之后,开始使用分配到的资源。参数containersToUse是挑选后资源列表
|--runAllocatedContainers(containersToUse)
|--for (container <- containersToUse) //遍历资源列表(contatiners)
//一个contatiner就会启动一个executor线程
|--new ExecutorRunnable.run()
//executor线程用于创建NMClient,与NodeManager通信
|--nmClient = NMClient.createNMClient()
|--startContainer()
//prepareCommand()方法会返回一个命令列表。
//其中关键性的命令:
//bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
|--val commands = prepareCommand()
//将commands中的命令通过NM客户端发送给NM后台来执行
//可以看出执行的命令的意思是: 运行YarnCoarseGrainedExecutorBackend进程
|--nmClient.startContainer(commands...)
...ExecutorBackend进程主要工作有如下两点:
... ①创建Executor的后台,并向driver进行注册。
... ②注册成功后创建Executor,等待执行Task
...当目前为止集群的资源准备完毕了
//资源准备完毕后,该执行resumeDriver方法
//此方法主要用于唤醒driver线程
|--resumeDriver()
...
上述源码分析到主线程将资源准备完毕,然后唤醒driver线程。当driver唤醒之后会继续执行应用程序(wordcount)中的创建rdd、转换算子、执行算子等等代码,最终划分好阶段,创建Task,然后将Task发送至exector执行。
其中阶段划分、Task的创建等源码这里就不多做分析了。