天天看点

Spark提交流程源码分析

说明

①此文主要以Yarn Cluster模式为例说明提交流程

②spark版本3.0

提交流程简图

Spark提交流程源码分析

提交流程描述

①在YARN Cluster模式下,任务提交后会创建yarn客户端yarnClient,通过客户端和ResourceManager通讯申请启动ApplicationMaster

②随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster。

③ApplicationMaster主线程会启动一个子线程,命名为driver,然后阻塞主线程,等待SC(SparkContext)创建。其中driver线程负责运行应用程序中的main()方法,启动应用程序。

④应用程序启动后,会创建SC上下文环境(SparkContext),然后将SC返回给主线程,并阻塞driver线程。

⑤主线程收到了SC之后结束阻塞继续运行,向ResourceManager进行注册并申请Executor内存

⑥ResourceManager接到ApplicationMaster的资源申请后会将集群的资源列表全部发送给ApplicationMaster

⑦ApplicationMaster会根据本地化级别挑选合适的资源,并创建NodeManger客户端,然后通过客户端向NodeManger发送用于启动Executor后台进程的命令

⑧ExecutorBackend进程启动后会向driver反向注册,并创建真正的执行对象Executor

至此计算资源准备完毕,接下来开始准备执行任务

⑨计算资源准备完毕,主线程就会唤醒driver线程,继续执行应用程序。

⑩之后执行到Action算子时,触发一个Job,开始划分stage(阶段),每个阶段会根据RDD的分区数生成对应个数的Task,Task先会被存入任务池中,然后被发送到Exector中执行。

源码部分

SparkSubmit

SparkSubmit类作为起点开始解读源码

SparkSubmit的main()方法作为程序入口,启动进程

|--SparkSubmit main()
|--doSubmit()
	...进入doSubmit方法...
	|--parseArguments(args) //解析参数
	|--submit(appArgs, uninitLog) //带着解析参数进行提交
	...进入submit方法...
		|--runMain(args, uninitLog)
		...进入runMain方法...
			//准备提交环境。prepareSubmitEnvironment主要作用是根据运行模式返回不同的Application
			//yarn模式返回的application是:org.apache.spark.deploy.yarn.YarnClusterApplication
			//其中返回值YarnClusterApplication 赋值给了变量childMainClass 
			|--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) 

			// 通过类名加载YarnClusterApplication 这个类
             // Class.forName("xxxxx")
             |-- mainClass = Utils.classForName(childMainClass)

			//判断mainClass是否是SparkApplication的子类,返回值使用变量app接收
			|--val app = classOf[SparkApplication].isAssignableFrom(mainClass)..\
			...进行条件判断...
				//如果是SparkApplication的子类则创建SparkApplication对象
				|--mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
				//如果不是则创建JavaMainApplication的对象
				|--new JavaMainApplication(mainClass)
			
			//由上可知:变量app的实际类型是:YarnClusterApplication
			|--app.start
			...接下来执行YarnClusterApplication中的start方法
           

源码中分析到开始执行YarnClusterApplication中的start方法,其中start方法主要做的事就是创建YarnClient(Yarn客户端),然后通过客户端向集群提交应用程序。

ApplicationMaster

集群会将客户端提交的应用委托给ApplicationMaster来完成,所接下来从ApplicationMaster的main方法作为入口继续分析源码。

|--main(...)
	//scala语言中创建对象时会对类进行初始化。此处的会获取到客户端提交的应用程序类(如:wordcount)
	|--val amArgs = new ApplicationMasterArguments(args)
	...初始化ApplicationMasterArguments类...
		//初始化成员属性
		var userClass: String = null
		//执行parseArgs方法,其主要作用是将应用程序类赋值给成员变量userClass
		|--parseArgs(args.toList)

	|--master = new ApplicationMaster //创建ApplicationMaster对象
	|--master.run() //ApplicationMaster中的run()方法
	...进入run方法...
		|--runDriver() 
		...进入runDriver方法
			//此方法用于启动客户端提交的应用程序(如:wordcount)
			|--startUserApplication()
			...进入startUserApplication方法...
				//通过反射获取到userClass类(应用程序类)中的main方法
				|--val mainMethod=userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
				|--val userThread = new Thread 	//创建了一个子线程,
				|--userThread.setName("Driver") //设置线程名字为driver,也就是说driver其实是一个线程。
				|--userThread.run()	//启动线程。此线程专门用于启动wordcount应用程序
				...进入run方法...
					//调用userClass类中的main方法,正式开始运行wordcount进程
					|--mainMethod.invoke(null, userArgs.toArray) 
						......wordcount代码即我们自己根据需求写的代码,可以说是再熟悉不过了,主要逻辑如下
						......创建SC(SparkContext)、创建rdd、转换算子、执行算子
						......执行算子中进行阶段划分,创建task,等等一系列的计算任务准备工作
						......不过当创建SC后,当前线程会阻塞,直到集群资源准备完毕后,才会被唤醒。
						

			//此方法会阻塞主线程,直到driver线程中SC(SparkContext)创建完成。
			|--val sc = ThreadUtils.awaitResult
			//主线线程获取到SC之后会结束阻塞,继续执行
			|--val rpcEnv = sc.env.rpcEnv //创建rpcEnv通信环境
			//通过SC获取配置信息
			|--val userConf = sc.getConf
			|--val host = userConf.get(DRIVER_HOST_ADDRESS)
			|--val port = userConf.get(DRIVER_PORT)
			//向ResourceManager注册AM(ApplicationMaster)
			|--registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
			//注册成功后开始准备创建资源分配器
			|--createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
			...进入createAllocator方法...
				//创建资源分配器。这里的client来自:client = new YarnRMClient()。
				|--allocator = client.createAllocator() 
				...进入createAllocator方法...
					//创建yarn的资源分配器,并返回
					|--new YarnAllocator

				//开始准备资源。
				|--allocator.allocateResources()
				....进入allocateResources方法...
					//1.先获取到集群中所有Container资源,使用list集合存储
					|--val allocatedContainers list[Container]=allocateResponse.getAllocatedContainers()
					//2.处理分配的资源。
					|--handleAllocatedContainers(allocatedContainers.asScala)
						//遍历资源列表,从中挑选出最适合计算的资源(根据主机、机架等本地化级别)
						|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
						|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
						....
						//资源挑选完毕之后,开始使用分配到的资源。参数containersToUse是挑选后资源列表
						|--runAllocatedContainers(containersToUse)
							|--for (container <- containersToUse)  //遍历资源列表(contatiners)
								//一个contatiner就会启动一个executor线程
								|--new ExecutorRunnable.run() 
								//executor线程用于创建NMClient,与NodeManager通信
								|--nmClient = NMClient.createNMClient()
									|--startContainer()
									//prepareCommand()方法会返回一个命令列表。
									//其中关键性的命令:
									//bin/java  org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
										|--val commands = prepareCommand()
										//将commands中的命令通过NM客户端发送给NM后台来执行
										//可以看出执行的命令的意思是: 运行YarnCoarseGrainedExecutorBackend进程
										|--nmClient.startContainer(commands...)
											...ExecutorBackend进程主要工作有如下两点:
											...	①创建Executor的后台,并向driver进行注册。
											...	②注册成功后创建Executor,等待执行Task
											...当目前为止集群的资源准备完毕了
			//资源准备完毕后,该执行resumeDriver方法
			//此方法主要用于唤醒driver线程
			|--resumeDriver()
      		...
           

上述源码分析到主线程将资源准备完毕,然后唤醒driver线程。当driver唤醒之后会继续执行应用程序(wordcount)中的创建rdd、转换算子、执行算子等等代码,最终划分好阶段,创建Task,然后将Task发送至exector执行。

其中阶段划分、Task的创建等源码这里就不多做分析了。

继续阅读