天天看點

Spark Job的送出與task本地化分析(源碼閱讀八)

  我們又都知道,Spark中任務的處理也要考慮資料的本地性(locality),Spark目前支援PROCESS_LOCAL(本地程序)、NODE_LOCAL(本地節點)、NODE_PREF、RACK_LOCAL(本地機架)、ANY(任何)幾種。其他都很好了解,NODE_LOCAL會在spark日志中執行拉取資料所執行的task時,列印出來,因為Spark是移動計算,而不是移動資料的嘛。

  那麼什麼是NODE_PREF?

  當Driver應用程式剛剛啟動,Driver配置設定獲得的Executor很可能還沒有初始化,是以有一部分任務的本地化級别被設定為NO_PREF.如果是ShuffleRDD,其本地性始終為NO_PREF。這兩種本地化級别是NO_PREF的情況,在任務配置設定時會被優先配置設定到非本地節點執行,達到一定的優化效果。

  那麼下來我們從job的任務送出開始玩起~

   

Spark Job的送出與task本地化分析(源碼閱讀八)

  getMissingParentStages方法用來找到Stage的所有不可用的父Stage.從代碼可以到這裡是個遞歸的調用,submitWaitingStages實際上循環waitingStages中的stage并調用submitStaghe:

  

Spark Job的送出與task本地化分析(源碼閱讀八)

  那麼下來開始送出task,送出task的入口是submitMissingTasks,此函數在Stage沒有可用的父stage時,被調用處理目前Stage未送出的任務。

  1、那麼在沒有父stage時,會首先調用paendingPartitions.clear 用于清空pendingTasks.由于目前Stage的任務剛開始送出,是以需要清空,便于記錄需要計算的任務。

  2、将目前Stage加入運作中的Stage集合,是用HashSet進行構造的。

  3、找出位計算的partition,如果Stage是map任務,那麼outputLocs中partition對應的List為Nil,說明此partition還未計算。如果Stage不是map任務,那麼需要擷取stage的finalJob,調用finished方法判斷每個partition的任務是否完成。

Spark Job的送出與task本地化分析(源碼閱讀八)

  4、然後通過stage.makeNewStageAttemp,使用StageInfo.fromStage方法建立目前Stage的_latestInfo:

Spark Job的送出與task本地化分析(源碼閱讀八)

  5、如果是Stage Map任務,那麼序列化Stage的RDD及ShuffleDependency,如果Stage不是map任務,那麼序列化Stage的RDD及resultOfJob的處理函數。最終這些序列化得到的位元組數組需要用sc.broadcast進行廣播。

Spark Job的送出與task本地化分析(源碼閱讀八)

  6、最後,建立所有Task、目前stage的id、jobId等資訊建立TaskSet,并調用taskScheduler的submitTasks,批量送出Stage及其所有Task.

Spark Job的送出與task本地化分析(源碼閱讀八)

  有可能同時有多個任務送出,是以就有了排程政策FIFO,那麼下來調用LocalBackend的reviveOffers方法,向local-Actor發送ReviveOffers消息。localActor對ReviveOffers消息的比對執行reviveOffers方法。調用TaskSchedulerImpl的resourceOffers方法配置設定資源,最後調用Executor的launchTask方法運作任務。

Spark Job的送出與task本地化分析(源碼閱讀八)

  同時你會發現,這裡有段代碼,shuffleOffers = Random.shuffle(offers),是為了計算資源的配置設定與計算,對所有WorkerOffer随機洗牌,避免将任務總是配置設定給同樣的WorkerOffer。

  好了,知道了整個流程,下來我們來看一下本地化問題:

Spark Job的送出與task本地化分析(源碼閱讀八)

  myLocalityLevles:目前TaskSetManager允許使用的本地化級别。那麼這裡的computeValidLocalityLevels方法是用于計算有效的本地化緩存級别。如果存在Executor中的有待執行的任務,且PROCESS_LOCAL本地化的等待時間不為0,且存在Executor已被激活,那麼允許的本地化級别裡包括PROCESS_LOCAL.

Spark Job的送出與task本地化分析(源碼閱讀八)

  這裡又發現新大陸,擷取各個本地化級别的等待時間。

  spark.locality.wait 本地化級别的預設等待時間 

  spark.locality.wait.process 本地程序的等待時間

  spark.locality.wait.node 本地節點的等待時間

  spark.locality.wait.rack 本地機架的等待時間

  這些參數呢,在任務的運作很長且數量很多的情況下,适當調高這些參數可以顯著提高性能,然而當這些參數值都已經超過任務的運作時長時,則需要調小這些參數。任何任務都希望被配置設定到可以從本地讀取資料的節點上以得到最大的性能提升,然而每個任務的運作時長時不可預計的。當一個任務在配置設定時,如果沒有滿足最佳本地化(PROCESS_LOCAL)的資源時,如果固執的期盼得到最佳的資源,很可能被已經占用最佳資源但是運作時間很長的任務耽誤,是以這些代碼實作了當沒有最佳本地化時,選擇稍差點的資源。

參考文獻:《深入了解Spark:核心思想與源碼分析》

繼續閱讀