天天看點

Spark源碼分析之三:Stage劃分

        Stage劃分的大體流程如下圖所示:

Spark源碼分析之三:Stage劃分

        前面提到,對于JobSubmitted事件,我們通過調用DAGScheduler的handleJobSubmitted()方法來處理。那麼我們先來看下代碼:

        這個handleJobSubmitted()方法一共做了這麼幾件事:

        第一,調用newResultStage()方法,生成Stage,包括最後一個Stage:ResultStage和前面的Parent Stage:ShuffleMapStage;

        第二,建立一個ActiveJob對象job;

        第三,清除RDD分區位置緩存;

        第四,調用logInfo()方法記錄日志資訊;

        第五,維護各種資料對應關系涉及到的資料結構:

        (1)将jobId-->ActiveJob的對應關系添加到HashMap類型的資料結構jobIdToActiveJob中去;

        (2)将ActiveJob添加到HashSet類型的資料結構activeJobs中去;

        第六,送出Stage;

        下面,除了送出Stage留在第三階段外,我們挨個分析第二階段的每一步。

        首先是調用newResultStage()方法,生成Stage,包括最後一個Stage:ResultStage和前面的Parent Stage:ShuffleMapStage。代碼如下:

        首先,根據fianl RDD擷取parent stages及id,這個id為ResultStage的stageId;

        其次,建立一個ResultStage,即為整個Job的finalStage;

        然後,将stage加入到資料結構stageIdToStage中;

        接着,更新資料結構jobIdToStageIds;

        最後,傳回這個ResultStage。

        我們一步步來看。首先調用getParentStagesAndId()方法,根據fianl RDD擷取parent stages及id,這個id為ResultStage的stageId。代碼如下:

        這個id即為下一個stageId,通過AtomicInteger類型的getAndIncrement()獲得,能夠保證原子性。繼續分析getParentStages()方法,通過它來擷取final RDD的parent stage。代碼如下:

        getParentStages()方法在其内部定義了如下資料結構:

        parents:用HashSet存儲parents stages,即finalRDD的所有parent stages,也就是ShuffleMapStage;

        visited:用HashSet存儲已經被通路過的RDD,在RDD被處理前先存入該HashSet,保證存儲在裡面的RDD将不會被重複處理;

        waitingForVisit:存儲需要被處理的RDD。Stack中得RDD都需要被處理。

        getParentStages()方法在其内部還定義了一個visit()方法,傳入一個RDD,如果之前沒有處理過,标記為已處理,并循環此RDD的依賴關系dependencies,如果是ShuffleDependency,調用getShuffleMapStage()方法擷取其parent stage;如果不是,則說明為同一stage,并壓入Stack:waitingForVisit頂部,等待後續通過visit()方法處理。是以,getParentStages()方法從finalRDD開始,逐漸往上查找,如果是窄依賴,證明在同一個Stage中,繼續往上查找,如果是寬依賴,通過getShuffleMapStage()方法擷取其parent

stage,就能得到整個Job中所有的parent stages,也就是ShuffleMapStage。

        接下來,我們看下getShuffleMapStage()方法的實作。代碼如下:

        從getShuffleMapStage()方法的注釋就能看出,這個方法的主要作用就是針對給定的shuffle dependency的map端,擷取或者建立一個ShuffleMapStage。為何是Get or create呢?通過源碼得知,getShuffleMapStage()方法首先會根據shuffleDep.shuffleId從資料結構shuffleToMapStage中查找哦是否存在對應的stage,如果存在則直接傳回,如果不存在,則調用newOrUsedShuffleStage()方法建立一個Stage并添加到資料結構shuffleToMapStage中,友善後續需要使用此Stage者直接使用。在此之前,會根據入參ShuffleDependency的rdd發現還沒有在shuffleToMapStage中注冊的祖先shuffle

dependencies,然後周遊每個ShuffleDependency,調用newOrUsedShuffleStage()方法為每個ShuffleDependency産生Stage并添加到資料結構shuffleToMapStage中。

        下面,我們看下這個getAncestorShuffleDependencies()方法的實作,代碼如下:

        通過代碼我們可以發現,它和getParentStages()方法的代碼風格非常相似。在其内部也定義了三個資料結構:

        parents:存放parents的棧,即Stack,用于存放入參RDD的在shuffleToMapStage中未注冊過的祖先shuffle dependencies;

        visited:存放已經處理過的RDD的哈希表,即HashSet;

        waitingForVisit:存放等待被處理的RDD的棧,即Stack;

        定義了一個visit()方法,入參為RDD,針對傳入的RDD,如果之前沒有處理過則标記為已處理,并循環RDD的所有依賴,如果是如果是ShuffleDependency,并且其依賴的shuffleId在shuffleToMapStage中沒有,添加到parents中,否則直接跳過,最後無論為何種Dependency,都将該dependence的rdd壓入waitingForVisit棧頂部,等待後續處理。

        接下來,我們再看下newOrUsedShuffleStage()方法,其代碼如下:

        這個方法的主要完成了以下兩件事:

        1、構造一個ShuffleMapStage執行個體stage;

        2、判斷是否在mapOutputTracker中存在:

           (1)如果不存在,調用mapOutputTracker的registerShuffle()方法注冊一個,注冊的内容為根據shuffleDep擷取的shuffleId和rdd中分區的個數;

           (2)如果存在,根據shuffleId從mapOutputTracker中擷取序列化的多個MapOutputStatus對象,反序列化後循環,逐個添加到stage中。

        緊接着,看下newShuffleMapStage()方法,其代碼如下:

        可以發現,這個方法也調用了getParentStagesAndId()方法,這樣,就形成了一個遞歸,按照RDD的依賴關系,由後往前,逐漸生成Stage。代碼剩餘的部分就是建立一個ShuffleMapStage,并将stage加入到資料結構stageIdToStage,以及調用updateJobIdStageIdMaps()方法更新相關資料結構。這個updateJobIdStageIdMaps()方法留待下面分析。

        下面,簡單看下mapOutputTracker注冊的代碼。

        很簡單,将shuffleId、numMaps大小和MapStatus類型的Array數組的映射關系,放入mapStatuses中,mapStatuses為TimeStampedHashMap[Int, Array[MapStatus]]類型的資料結構。

       經曆了這多又長又大篇幅的叙述,現在傳回newResultStage()方法,在通過getParentStagesAndId()方法擷取parent stages及其result stage的id後,緊接着建立一個ResultStage,并将stage加入到stageIdToStage中,最後在調用updateJobIdStageIdMaps()更新資料結構jobIdToStageIds後,傳回stage。

        下面,簡單看下updateJobIdStageIdMaps()方法。代碼如下:

        這個方法的實作比較簡單,在其内部定義了一個函數updateJobIdStageIdMapsList(),首選傳入result stage,将jobId添加到stage的jobIds中,更新jobIdToStageIds,将jobId與stageIds的對應關系添加進去,然後根據給定stage的RDD擷取其parent stages,過濾出不包含此JobId的parents stages,再遞歸調用updateJobIdStageIdMapsList()方法,直到全部stage都處理完。

        至此,第二階段Stage劃分大體流程已分析完畢,有遺漏或不清楚的地方,以後再查缺補漏以及細化及更正錯誤。

繼續閱讀