天天看點

spark基礎之排程器運作機制簡述

一 概述

驅動程式在啟動的時候,首先會初始化SparkContext,初始化SparkContext的時候,就會建立DAGScheduler、TaskScheduler、SchedulerBackend等,同時還會向Master注冊程式;如果注冊沒有問題。Master通過叢集管理器(cluster manager)會給這個程式配置設定資源,然後SparkContext根據action觸發job。

Job裡面有一系列RDD, DAGScheduler從後往前推若發現是寬依賴的話,就劃分不同的Stage。

Stage劃分完後,Stage送出給底層的排程器TaskScheduler,TaskScheduler拿到這個Task的集合,因為Stage内部都是計算邏輯完全一樣的任務,隻是資料不一樣而已。TaskScheduler就會根據資料本底性,将任務配置設定到Executor上執行。

Executor在任務運作完畢或者出狀況時,肯定要向Driver彙報

最後運作完畢,關閉SparkContext,同時建立的那些對象也被關掉。

二 什麼是Spark Driver 程式

Driver程式就是運作應用程式的main函數,它會建立SparkContext,準備應用程式的運作環境(初始化各個元件,比如DAGScheduler等),

然後應用程式由SparkContext負責和叢集通信,資源的申請以及任務的配置設定和監控等。當Worker節點的Executor執行完Task之後,Driver同時負責将SparkContext關閉。

三 SparkContext

SparkContext是使用者和Spark叢集進行互動的唯一入口,可以用來在Spark叢集中建立RDD,累加器Accumulator和廣播變量; 它也是驅動程式至關重要的對象,由它提供應用程式所需要的運作環境。

SparkContext的核心作用就是準備應用程式運作環境,是以在初始化的時候會構造一系列對象DAGScheduler, TaskScheduler等,同時負責向Master注冊應用程式

隻可以有一個SparkContext執行個體運作在一個JVM中,是以在建立SparkContext的時候之前,確定之前的SparkContext已經關閉了,即調用stop方法停止目前JVM中唯一運作的SparkContext

四 Spark Job的觸發

# 每一個final RDD的action操作會觸發一個job,比如count,collect,saveAsTextFile,forEach等都會觸發job。這就意味着應用程式如果有多個action操作 .

# 每一個Job根據寬依賴來劃分Stage,每一個job可能有一個或者多個Stage,比如reduceByKey,groupByKey等算子,每一個Stage生成一個Task

# 所有的Stage會形成一個DAG(有向無環圖),由于RDD的Lazy特性,導緻Stage也是Lazy級别的,隻有遇到了Action才會真正發生作業的執行,在Action之前,Spark架構隻是将要進行的計算記錄下來,并沒有真的執行。

# 一個作業可能有ResultStage和ShuffleMapStage組成:一個作業如果shuffle操作,那麼就隻有一個ResultStage;如果有shuffle操作,那麼,則存在一個ResultStage和至少一個ShuffleMapStage

spark基礎之排程器運作機制簡述
spark基礎之排程器運作機制簡述

五 DAGScheduler

# DAG:Direct Acyclic Graph,spark主要用于RDD關系模組化,描述RDD之間的依賴關系,主要用于建構RDD的資料流,即RDD的各個分區資料是從哪裡來的和建構基于資料流之上的操作算子流,即RDD各個分區資料總共會經過哪些transformation和action的這兩種類型的一系列的操作的排程運作

# DAGScheduler需要解析DAG.它是一個面向stage的高層排程器,它把DAG拆分成很多個Task,每一組task都是一個stage,解析的時候,每當遇到shuffle操作的時候就會産生新的stage,然後以一個個TaskSet的形式送出給底層的排程器TaskScheduler.

# DAGScheduler需要記錄哪些RDD需要寫入磁盤

# DAGScheduler 需要尋求Task的最優排程,比如stage内部資料的本地性等

# DAGScheduler 需要監視因為shuffle跨節點輸出可能導緻的失敗,如果發現stage失敗,可能需要重新送出stage

Job、Stage、TaskSet、Task含義和關系:

Job: 一個action操作就會觸發一個job,如果有多個action操作就會有多個job.

Stage: 一個Job會被DAGScheduler拆分成多組任務,每一組任務就是由一個Stage封裝,stage之間也有依賴關系。如果RDD之間沒有shuffle操作那麼就隻有一個ResultStage;如果有shuffle操作,那麼就有一個ResultStage和至少一個ShuffleMapStage

TaskSet: 一組任務就是一個TaskSet,對應着一個Stage,是以也可以了解為一個Stage就是一個TaskSet

Task: 一個獨立的工作單元,由驅動程式發送到Executor上去執行。通常情況下,一個Task處理一個RDD的分區的資料,根據傳回類型不同,又分為ResultTask和ShuffleMapTask

六 TaskScheduler

TaskScheduler主要是送出TaskSet到叢集運算并彙報結果

# 為TaskSet建立和維護一個TaskSetManager,并追蹤任務本地性及錯誤資訊

# 遇到一些迷路的任務(straggle)會放在其他節點重試

# 向DAGScheduler彙報執行情況,包括shuffle輸出丢失時報告fetch failed錯誤

七 SchedulerBackend

排程器的通信終端,以SparkDeploySchedulerBackend在啟動時,構造了AppClient執行個體,并在該執行個體start時啟動ClientEndpoint消息循環體,ClientEndpoint在啟動時會向Master注冊目前程式。

SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend在start時會執行個體化類型為DriverEndPoint消息循環體,SparkDeploySchedulerBackend專門負責收集Worker上資源資訊,當ExecutorBackend啟動時會發送RegisteredExecutor資訊向DriverPoint注冊,此時SparkDeploySchedulerBackend就掌握了目前應用程式所擁有的計算資源。

繼續閱讀