說明
①此文主要以Yarn Cluster模式為例說明送出流程
②spark版本3.0
送出流程簡圖

送出流程描述
①在YARN Cluster模式下,任務送出後會建立yarn用戶端yarnClient,通過用戶端和ResourceManager通訊申請啟動ApplicationMaster
②随後ResourceManager配置設定container,在合适的NodeManager上啟動ApplicationMaster。
③ApplicationMaster主線程會啟動一個子線程,命名為driver,然後阻塞主線程,等待SC(SparkContext)建立。其中driver線程負責運作應用程式中的main()方法,啟動應用程式。
④應用程式啟動後,會建立SC上下文環境(SparkContext),然後将SC傳回給主線程,并阻塞driver線程。
⑤主線程收到了SC之後結束阻塞繼續運作,向ResourceManager進行注冊并申請Executor記憶體
⑥ResourceManager接到ApplicationMaster的資源申請後會将叢集的資源清單全部發送給ApplicationMaster
⑦ApplicationMaster會根據本地化級别挑選合适的資源,并建立NodeManger用戶端,然後通過用戶端向NodeManger發送用于啟動Executor背景程序的指令
⑧ExecutorBackend程序啟動後會向driver反向注冊,并建立真正的執行對象Executor
至此計算資源準備完畢,接下來開始準備執行任務
⑨計算資源準備完畢,主線程就會喚醒driver線程,繼續執行應用程式。
⑩之後執行到Action算子時,觸發一個Job,開始劃分stage(階段),每個階段會根據RDD的分區數生成對應個數的Task,Task先會被存入任務池中,然後被發送到Exector中執行。
源碼部分
SparkSubmit
SparkSubmit類作為起點開始解讀源碼
SparkSubmit的main()方法作為程式入口,啟動程序
|--SparkSubmit main()
|--doSubmit()
...進入doSubmit方法...
|--parseArguments(args) //解析參數
|--submit(appArgs, uninitLog) //帶着解析參數進行送出
...進入submit方法...
|--runMain(args, uninitLog)
...進入runMain方法...
//準備送出環境。prepareSubmitEnvironment主要作用是根據運作模式傳回不同的Application
//yarn模式傳回的application是:org.apache.spark.deploy.yarn.YarnClusterApplication
//其中傳回值YarnClusterApplication 指派給了變量childMainClass
|--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// 通過類名加載YarnClusterApplication 這個類
// Class.forName("xxxxx")
|-- mainClass = Utils.classForName(childMainClass)
//判斷mainClass是否是SparkApplication的子類,傳回值使用變量app接收
|--val app = classOf[SparkApplication].isAssignableFrom(mainClass)..\
...進行條件判斷...
//如果是SparkApplication的子類則建立SparkApplication對象
|--mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
//如果不是則建立JavaMainApplication的對象
|--new JavaMainApplication(mainClass)
//由上可知:變量app的實際類型是:YarnClusterApplication
|--app.start
...接下來執行YarnClusterApplication中的start方法
源碼中分析到開始執行YarnClusterApplication中的start方法,其中start方法主要做的事就是建立YarnClient(Yarn用戶端),然後通過用戶端向叢集送出應用程式。
ApplicationMaster
叢集會将用戶端送出的應用委托給ApplicationMaster來完成,所接下來從ApplicationMaster的main方法作為入口繼續分析源碼。
|--main(...)
//scala語言中建立對象時會對類進行初始化。此處的會擷取到用戶端送出的應用程式類(如:wordcount)
|--val amArgs = new ApplicationMasterArguments(args)
...初始化ApplicationMasterArguments類...
//初始化成員屬性
var userClass: String = null
//執行parseArgs方法,其主要作用是将應用程式類指派給成員變量userClass
|--parseArgs(args.toList)
|--master = new ApplicationMaster //建立ApplicationMaster對象
|--master.run() //ApplicationMaster中的run()方法
...進入run方法...
|--runDriver()
...進入runDriver方法
//此方法用于啟動用戶端送出的應用程式(如:wordcount)
|--startUserApplication()
...進入startUserApplication方法...
//通過反射擷取到userClass類(應用程式類)中的main方法
|--val mainMethod=userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
|--val userThread = new Thread //建立了一個子線程,
|--userThread.setName("Driver") //設定線程名字為driver,也就是說driver其實是一個線程。
|--userThread.run() //啟動線程。此線程專門用于啟動wordcount應用程式
...進入run方法...
//調用userClass類中的main方法,正式開始運作wordcount程序
|--mainMethod.invoke(null, userArgs.toArray)
......wordcount代碼即我們自己根據需求寫的代碼,可以說是再熟悉不過了,主要邏輯如下
......建立SC(SparkContext)、建立rdd、轉換算子、執行算子
......執行算子中進行階段劃分,建立task,等等一系列的計算任務準備工作
......不過當建立SC後,目前線程會阻塞,直到叢集資源準備完畢後,才會被喚醒。
//此方法會阻塞主線程,直到driver線程中SC(SparkContext)建立完成。
|--val sc = ThreadUtils.awaitResult
//主線線程擷取到SC之後會結束阻塞,繼續執行
|--val rpcEnv = sc.env.rpcEnv //建立rpcEnv通信環境
//通過SC擷取配置資訊
|--val userConf = sc.getConf
|--val host = userConf.get(DRIVER_HOST_ADDRESS)
|--val port = userConf.get(DRIVER_PORT)
//向ResourceManager注冊AM(ApplicationMaster)
|--registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
//注冊成功後開始準備建立資源配置設定器
|--createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
...進入createAllocator方法...
//建立資源配置設定器。這裡的client來自:client = new YarnRMClient()。
|--allocator = client.createAllocator()
...進入createAllocator方法...
//建立yarn的資源配置設定器,并傳回
|--new YarnAllocator
//開始準備資源。
|--allocator.allocateResources()
....進入allocateResources方法...
//1.先擷取到叢集中所有Container資源,使用list集合存儲
|--val allocatedContainers list[Container]=allocateResponse.getAllocatedContainers()
//2.處理配置設定的資源。
|--handleAllocatedContainers(allocatedContainers.asScala)
//周遊資源清單,從中挑選出最适合計算的資源(根據主機、機架等本地化級别)
|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
|--for (allocatedContainer <- allocatedContainers){matchContainerToRequest(..)}
....
//資源挑選完畢之後,開始使用配置設定到的資源。參數containersToUse是挑選後資源清單
|--runAllocatedContainers(containersToUse)
|--for (container <- containersToUse) //周遊資源清單(contatiners)
//一個contatiner就會啟動一個executor線程
|--new ExecutorRunnable.run()
//executor線程用于建立NMClient,與NodeManager通信
|--nmClient = NMClient.createNMClient()
|--startContainer()
//prepareCommand()方法會傳回一個指令清單。
//其中關鍵性的指令:
//bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
|--val commands = prepareCommand()
//将commands中的指令通過NM用戶端發送給NM背景來執行
//可以看出執行的指令的意思是: 運作YarnCoarseGrainedExecutorBackend程序
|--nmClient.startContainer(commands...)
...ExecutorBackend程序主要工作有如下兩點:
... ①建立Executor的背景,并向driver進行注冊。
... ②注冊成功後建立Executor,等待執行Task
...當目前為止叢集的資源準備完畢了
//資源準備完畢後,該執行resumeDriver方法
//此方法主要用于喚醒driver線程
|--resumeDriver()
...
上述源碼分析到主線程将資源準備完畢,然後喚醒driver線程。當driver喚醒之後會繼續執行應用程式(wordcount)中的建立rdd、轉換算子、執行算子等等代碼,最終劃分好階段,建立Task,然後将Task發送至exector執行。
其中階段劃分、Task的建立等源碼這裡就不多做分析了。