天天看點

Spark送出流程源碼分析

說明

①此文主要以Yarn Cluster模式為例說明送出流程

②spark版本3.0

送出流程簡圖

Spark送出流程源碼分析

送出流程描述

①在YARN Cluster模式下,任務送出後會建立yarn用戶端yarnClient,通過用戶端和ResourceManager通訊申請啟動ApplicationMaster

②随後ResourceManager配置設定container,在合适的NodeManager上啟動ApplicationMaster。

③ApplicationMaster主線程會啟動一個子線程,命名為driver,然後阻塞主線程,等待SC(SparkContext)建立。其中driver線程負責運作應用程式中的main()方法,啟動應用程式。

④應用程式啟動後,會建立SC上下文環境(SparkContext),然後将SC傳回給主線程,并阻塞driver線程。

⑤主線程收到了SC之後結束阻塞繼續運作,向ResourceManager進行注冊并申請Executor記憶體

⑥ResourceManager接到ApplicationMaster的資源申請後會将叢集的資源清單全部發送給ApplicationMaster

⑦ApplicationMaster會根據本地化級别挑選合适的資源,并建立NodeManger用戶端,然後通過用戶端向NodeManger發送用于啟動Executor背景程序的指令

⑧ExecutorBackend程序啟動後會向driver反向注冊,并建立真正的執行對象Executor

至此計算資源準備完畢,接下來開始準備執行任務

⑨計算資源準備完畢,主線程就會喚醒driver線程,繼續執行應用程式。

⑩之後執行到Action算子時,觸發一個Job,開始劃分stage(階段),每個階段會根據RDD的分區數生成對應個數的Task,Task先會被存入任務池中,然後被發送到Exector中執行。

源碼部分

SparkSubmit

SparkSubmit類作為起點開始解讀源碼

SparkSubmit的main()方法作為程式入口,啟動程序

|--SparkSubmit main()
|--doSubmit()
	...進入doSubmit方法...
	|--parseArguments(args) //解析參數
	|--submit(appArgs, uninitLog) //帶着解析參數進行送出
	...進入submit方法...
		|--runMain(args, uninitLog)
		...進入runMain方法...
			//準備送出環境。prepareSubmitEnvironment主要作用是根據運作模式傳回不同的Application
			//yarn模式傳回的application是:org.apache.spark.deploy.yarn.YarnClusterApplication
			//其中傳回值YarnClusterApplication 指派給了變量childMainClass 
			|--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) 

			// 通過類名加載YarnClusterApplication 這個類
             // Class.forName("xxxxx")
             |-- mainClass = Utils.classForName(childMainClass)

			//判斷mainClass是否是SparkApplication的子類,傳回值使用變量app接收
			|--val app = classOf[SparkApplication].isAssignableFrom(mainClass)..\
			...進行條件判斷...
				//如果是SparkApplication的子類則建立SparkApplication對象
				|--mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
				//如果不是則建立JavaMainApplication的對象
				|--new JavaMainApplication(mainClass)
			
			//由上可知:變量app的實際類型是:YarnClusterApplication
			|--app.start
			...接下來執行YarnClusterApplication中的start方法
           

源碼中分析到開始執行YarnClusterApplication中的start方法,其中start方法主要做的事就是建立YarnClient(Yarn用戶端),然後通過用戶端向叢集送出應用程式。

ApplicationMaster

叢集會将用戶端送出的應用委托給ApplicationMaster來完成,所接下來從ApplicationMaster的main方法作為入口繼續分析源碼。

|--main(...)
	//scala語言中建立對象時會對類進行初始化。此處的會擷取到用戶端送出的應用程式類(如:wordcount)
	|--val amArgs = new ApplicationMasterArguments(args)
	...初始化ApplicationMasterArguments類...
		//初始化成員屬性
		var userClass: String = null
		//執行parseArgs方法,其主要作用是将應用程式類指派給成員變量userClass
		|--parseArgs(args.toList)

	|--master = new ApplicationMaster //建立ApplicationMaster對象
	|--master.run() //ApplicationMaster中的run()方法
	...進入run方法...
		|--runDriver() 
		...進入runDriver方法
			//此方法用于啟動用戶端送出的應用程式(如:wordcount)
			|--startUserApplication()
			...進入startUserApplication方法...
				//通過反射擷取到userClass類(應用程式類)中的main方法
				|--val mainMethod=userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
				|--val userThread = new Thread 	//建立了一個子線程,
				|--userThread.setName("Driver") //設定線程名字為driver,也就是說driver其實是一個線程。
				|--userThread.run()	//啟動線程。此線程專門用于啟動wordcount應用程式
				...進入run方法...
					//調用userClass類中的main方法,正式開始運作wordcount程序
					|--mainMethod.invoke(null, userArgs.toArray) 
						......wordcount代碼即我們自己根據需求寫的代碼,可以說是再熟悉不過了,主要邏輯如下
						......建立SC(SparkContext)、建立rdd、轉換算子、執行算子
						......執行算子中進行階段劃分,建立task,等等一系列的計算任務準備工作
						......不過當建立SC後,目前線程會阻塞,直到叢集資源準備完畢後,才會被喚醒。
						

			//此方法會阻塞主線程,直到driver線程中SC(SparkContext)建立完成。
			|--val sc = ThreadUtils.awaitResult
			//主線線程擷取到SC之後會結束阻塞,繼續執行
			|--val rpcEnv = sc.env.rpcEnv //建立rpcEnv通信環境
			//通過SC擷取配置資訊
			|--val userConf = sc.getConf
			|--val host = userConf.get(DRIVER_HOST_ADDRESS)
			|--val port = userConf.get(DRIVER_PORT)
			//向ResourceManager注冊AM(ApplicationMaster)
			|--registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
			//注冊成功後開始準備建立資源配置設定器
			|--createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
			...進入createAllocator方法...
				//建立資源配置設定器。這裡的client來自:client = new YarnRMClient()。
				|--allocator = client.createAllocator() 
				...進入createAllocator方法...
					//建立yarn的資源配置設定器,并傳回
					|--new YarnAllocator

				//開始準備資源。
				|--allocator.allocateResources()
				....進入allocateResources方法...
					//1.先擷取到叢集中所有Container資源,使用list集合存儲
					|--val allocatedContainers list[Container]=allocateResponse.getAllocatedContainers()
					//2.處理配置設定的資源。
					|--handleAllocatedContainers(allocatedContainers.asScala)
						//周遊資源清單,從中挑選出最适合計算的資源(根據主機、機架等本地化級别)
						|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
						|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
						....
						//資源挑選完畢之後,開始使用配置設定到的資源。參數containersToUse是挑選後資源清單
						|--runAllocatedContainers(containersToUse)
							|--for (container <- containersToUse)  //周遊資源清單(contatiners)
								//一個contatiner就會啟動一個executor線程
								|--new ExecutorRunnable.run() 
								//executor線程用于建立NMClient,與NodeManager通信
								|--nmClient = NMClient.createNMClient()
									|--startContainer()
									//prepareCommand()方法會傳回一個指令清單。
									//其中關鍵性的指令:
									//bin/java  org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
										|--val commands = prepareCommand()
										//将commands中的指令通過NM用戶端發送給NM背景來執行
										//可以看出執行的指令的意思是: 運作YarnCoarseGrainedExecutorBackend程序
										|--nmClient.startContainer(commands...)
											...ExecutorBackend程序主要工作有如下兩點:
											...	①建立Executor的背景,并向driver進行注冊。
											...	②注冊成功後建立Executor,等待執行Task
											...當目前為止叢集的資源準備完畢了
			//資源準備完畢後,該執行resumeDriver方法
			//此方法主要用于喚醒driver線程
			|--resumeDriver()
      		...
           

上述源碼分析到主線程将資源準備完畢,然後喚醒driver線程。當driver喚醒之後會繼續執行應用程式(wordcount)中的建立rdd、轉換算子、執行算子等等代碼,最終劃分好階段,建立Task,然後将Task發送至exector執行。

其中階段劃分、Task的建立等源碼這裡就不多做分析了。

繼續閱讀