天天看點

SparkConf加載與SparkContext建立(源碼閱讀四)

  sparkContext建立還沒完呢,緊接着前兩天,我們繼續探索。。作死。。。

  緊接着前幾天我們繼續SparkContext的建立:

SparkConf加載與SparkContext建立(源碼閱讀四)

 

SparkConf加載與SparkContext建立(源碼閱讀四)

  接下來從這裡我們可以看到,spark開始加載hadoop的配置資訊,第二張圖中 new出來的Configuration正是hadoop的Configuration。同時,将所有sparkConf中所有以spark.hadoop.開頭的屬性都複制到了Hadoop的Configuration.同時又将spark.buffer.size複制為Hadoop的Configuration的配置的Io.file.buffer.size.随之加載相關jar包。再下來,我們可以看到:

SparkConf加載與SparkContext建立(源碼閱讀四)

  我們可以看到,将所有的executor的環境變量加載于_executorMemory以及executorEnvs,後續應該在注冊executor時進行調用。随之建立_taskScheduler:

  

SparkConf加載與SparkContext建立(源碼閱讀四)

  那麼我們深入看下createTaskScheduler的過程:

SparkConf加載與SparkContext建立(源碼閱讀四)
SparkConf加載與SparkContext建立(源碼閱讀四)

  這裡可以看到,它幹了很多變态的事情,那麼先說下,什麼是TaskScheduler呢?TaskScheduler負責任務的送出,并且請求叢集管理器對任務排程。TaskScheduler也可以看做任務排程的用戶端。那麼createTaskScheduler會根據master的配置(master match),比對部署模式,利用反射建立yarn-cluster(本例圖中為local及yarn-cluster),随之initialize了CoarseGrainedSchedulerBackend。(以後再深入了解CoarseGrainedSchedulerBackend)

  代碼中可以看到,建立了TaskSchedulerImpl,它是什麼呢?

SparkConf加載與SparkContext建立(源碼閱讀四)

  它從SparkConf中讀取配置資訊,包括每個任務配置設定的CPU數,失敗task重試次數(可通過spark.task.maxFailures來配置),多久推測執行一次spark.speculation.interval(當然是在spark.speculation為true的情況下生效)等等。這裡還有個排程模式,排程模式分為FIFO和FAIR兩種,通過修改參數spark.scheduler.mode來改變。 最終建立TaskResultGetter,它的作用是對executor中的task的執行結果進行處理。

  随之,開始建立DAG。DAGScheduler主要用于在任務正式交給TaskSchedulerImpl送出之前做一些準備工作。建立job,将DAG中的RDD劃分到不同的Stage,送出Stage,等等。

SparkConf加載與SparkContext建立(源碼閱讀四)

  我們繼續深入看下它的建立過程。

SparkConf加載與SparkContext建立(源碼閱讀四)

  從這些變量中,我們可以看到,DAG是将所有jobId,stageId等資訊之間的關系,以及緩存的RDD的partition位置等。比如getCacheLocs、getShuffleMapStage、getParentStagesAndId、newOrUsedShuffleStage。下來,通過applicationId注冊并建立executor.

SparkConf加載與SparkContext建立(源碼閱讀四)

  中間省略一萬字(其實是沒看懂),下來建立并啟動ExecutorAllocationManager,它是幹嘛的呢?

SparkConf加載與SparkContext建立(源碼閱讀四)

  ExecutorAllocationManager是對所有的已配置設定的Executor進行管理。預設情況下不會建立ExecutorAllocationManager,可以修改屬性spark.dynamicAllocation.enabled為true來建立。ExecutorAllocationManager可以設定動态配置設定最小Executor數量、動态配置設定最大Executor數量,每個Executor可以運作的Task數量等配置資訊。(這個還真要試一下,沒有配置過)ExecutorAllocationListener通過監聽listenerBus裡的事件、動态添加、删除exeuctor,通過Thread不斷添加Executor,周遊Executor,将逾時的Executor殺掉并移除。

SparkConf加載與SparkContext建立(源碼閱讀四)

參考文獻:《深入了解Spark核心思想與源碼解析》

繼續閱讀