天天看點

Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

聲明:

        文章中代碼及相關語句為自己根據相應了解編寫,文章中出現的相關圖檔為自己實踐中的截圖和相關技術對應的圖檔,若有相關異議,請聯系删除。感謝。轉載請注明出處,感謝。

By luoyepiaoxue2014

B站:https://space.bilibili.com/1523287361 點選打開連結

微網誌位址: https://weibo.com/luoyepiaoxue2014 點選打開連結

title: Spark系列

第六章 Spark應用程式運作機制

6.1 Spark的基本運作流程

Spark任務的核心執行流程主要分為四大步驟:

Driver工作:Build DAG
DAGScheduler工作:Split DAG to Stage
TaskScheduler工作:Change Stage to TaskSet
Worker工作:execute Task
           
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

核心四大步驟:

1、建構DAG
使用算子操作RDD進行各種transformation操作,最後通過action操作觸發Spark作業運作。送出之後Spark會根據轉換過程所産生的RDD之間的依賴關系建構有向無環圖。

2、DAG切割
DAG切割主要根據RDD的依賴是否為寬依賴來決定切割節點,當遇到寬依賴就将任務劃分為一個新的排程階段(Stage)。每個Stage中包含一個或多個Task。這些Task将形成任務集(TaskSet),送出給底層排程器進行排程運作。

3、任務排程
每一個Spark任務排程器隻為一個SparkContext執行個體服務。當任務排程器收到任務集後負責把任務集以Task任務的形式分發至Worker節點的Executor程序中執行,如果某個任務失敗,任務排程器負責重新配置設定該任務的計算。

4、執行任務
當Executor收到發送過來的任務後,将以多線程(會在啟動executor的時候就初始化好了一個線程池)的方式執行任務的計算,每個線程負責一個任務,任務結束後會根據任務的類型選擇相應的傳回方式将結果傳回給任務排程器。

           

6.2 運作流程圖解

1、建構Spark Application的運作環境(初始化SparkContext),SparkContext向資料總管(可以是Standalone、Mesos或YARN)注冊并申請運作Executor資源

2、資料總管配置設定Executor資源并啟動StandaloneExecutorBackend,Executor運作情況将随着心跳發送到資料總管上

3、SparkContext建構成DAG圖,将DAG圖分解成Stage,并把Taskset發送給TaskScheduler。Executor向SparkContext申請Task,TaskScheduler将Task發放給Executor運作同時SparkContext将應用程式代碼發放給Executor

4、Task在Executor上運作,運作完畢釋放所有資源。
           
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制
Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

6.3 SparkContext初始化

關于SparkContext:

1、SparkContext是使用者通往Spark叢集的唯一入口,可以用來在Spark叢集中建立RDD、累加器Accumulator和廣播變量Braodcast Variable

2、SparkContext 也是整個Spark應用程式中至關重要的一個對象,可以說是整個應用程式運作排程的核心(不是指資源排程)

3、SparkContext在執行個體化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend

4、SparkContext會調用DAGScheduler将整個Job劃分成幾個小的階段(Stage),TaskScheduler會排程每個Stage的任務(Task)應該如何處理。另外,SchedulerBackend管理整個叢集中為這個目前的應用配置設定的計算資源(Executor)

           

初始化流程:

1、處理使用者的jar或者資源檔案,和日志處理相關

Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

2、初始化異步監聽bus:監聽spark事件,用于SparkUI的跟蹤管理

_listenerBus = new LiveListenerBus(_conf)
           

3、初始化Spark運作環境相關變量

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
           

4、啟動心跳接收器:在建立taskScheduler之間需要先注冊HeartbeatReceiver,因為Executor在建立 時回去檢索HeartbeatReceiver

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because
Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
	HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
           

5、建立SchedulerBackend、TaskScheduler、DAGScheduler

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
           

6、啟動TaskScheduler:在TaskScheduler被DAGScheduler引用後,就可以進行啟動

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
           
排程啟動兩個角色: 通訊兵
1、跟master打交道:clientEndpoint 負責送出app到master
2、跟worker打交道:driverEndpoint 負責派發task到worker
           

7、Post Init 各種啟動

setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()

// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
           

6.4 DAGScheduler

核心描述:

一個Application = 多個job
一個job = 多個stage,也可以說一個application = 多個stage
一個Stage = 多個同種task并行運作
Task 分為 ShuffleMapTask 和 ResultTask
Dependency 分為 ShuffleDependency寬依賴 和 NarrowDependency窄依賴
面向stage的切分,切分依據為寬依賴
           

DAGScheduler工作機制:

DAGScheduler拿到一個job,會切分成多個stage
從job的後面往前去尋找Shuffle算子,如果找到一個shuffle算子,就切分。已經找到的這些rdd的執行鍊連成一個stage,放入到一個 資料結構中(棧 )
将來DAGScheduler要幫這個棧中的每一個stage拿出來,送出給TaskScheduler
           

主要作用:

維護waiting jobs和active jobs兩個隊列,維護waiting stages、active stages和failed stages,以及與jobs的映射關系
           

主要職能:

1、接收送出Job的主入口,submitJob(rdd, ...)或runJob(rdd, ...)。在SparkContext裡會調用這兩個方法。 
應用程式的main運作,最終會排程:DAGScheduler的runJob()方法

生成一個Stage并送出,接着判斷Stage是否有父Stage未完成,若有,送出并等待父Stage,以此類推。結果是:DAGScheduler裡增加了一些waiting stage和一個running stage。
running stage送出後,分析stage裡Task的類型,生成一個Task描述,即TaskSet。
調用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述送出給TaskScheduler。TaskScheduler依據資源量和觸發配置設定條件,會為這個TaskSet配置設定資源并觸發執行。
DAGScheduler送出job後,異步傳回JobWaiter對象,能夠傳回job運作狀态,能夠cancel job,執行成功後會處理并傳回結果

2、處理TaskCompletionEvent 

如果task執行成功,對應的stage裡減去這個task,做一些計數工作: 
A:如果task是ResultTask,計數器Accumulator加一,在job裡為該task置為true,job finish總數加一。加完後如果finish數目與partition數目相等,說明這個stage完成了,标記stage完成,從running stages裡減去這個stage,做一些stage移除的清理工作
B:如果task是ShuffleMapTask,計數器Accumulator加一,在stage裡加上一個output location,裡面是一個MapStatus類。MapStatus是ShuffleMapTask執行完成的傳回,包含location資訊和block size(可以選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker注冊本stage裡的shuffleId和location資訊。然後檢查stage的output location裡是否存在空,若存在空,說明一些task失敗了,整個stage重新送出;否則,繼續從waiting stages裡送出下一個需要做的stage
C:如果task是重送出,對應的stage裡增加這個task:
如果task是fetch失敗,馬上标記對應的stage完成,從running stages裡減去。如果不允許retry,abort整個stage;否則,重新送出整個stage。另外,把這個fetch相關的location和map任務資訊,從stage裡剔除,從MapOutputTracker登出掉。最後,如果這次fetch的blockManagerId對象不為空,做一次ExecutorLost處理,下次shuffle會換在另一個executor上去執行。
D:其他task狀态會由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

3、其他與job相關的操作還包括:cancel job,cancel stage, resubmit failed stage等

4、其他職能:cacheLocations 和 preferLocation

           

6.5 TaskScheduler

核心功能:

維護task和executor對應關系,executor和實體資源對應關系,在排隊的task和正在跑的task。維護内部一個任務隊列,根據FIFO或Fair政策,排程任務。

TaskScheduler本身是個接口,spark裡隻實作了一個TaskSchedulerImpl,理論上任務排程可以定制。
           

主要職能:

1、submitTasks(taskSet),接收DAGScheduler送出來的tasks 
為tasks建立一個TaskSetManager,添加到任務隊列裡。TaskSetManager跟蹤每個task的執行狀況,維護了task的許多具體資訊。
觸發一次資源的需要。 
首先,TaskScheduler對照手頭的可用資源和Task隊列,進行executor配置設定(考慮優先級、本地化等政策),符合條件的executor會被配置設定給TaskSetManager。
然後,得到的Task描述交給SchedulerBackend,調用launchTask(tasks),觸發executor上task的執行。task描述被序列化後發給executor,executor提取task資訊,調用task的run()方法執行計算。

2、cancelTasks(stageId),取消一個stage的tasks 
調用SchedulerBackend的killTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler裡一直維護着。

3、resourceOffer(offers: Seq[Workers]),這是非常重要的一個方法,調用者是SchedulerBacnend,用途是底層資源SchedulerBackend把空餘的workers資源交給TaskScheduler,讓其根據排程政策為排隊的任務配置設定合理的cpu和記憶體資源,然後把任務描述清單傳回給SchedulerBackend 

從worker offers裡,搜集executor和host的對應關系、active executors、機架資訊等等。worker offers資源清單進行随機洗牌,任務隊列裡的任務清單依據排程政策進行一次排序

周遊每個taskSet,按照程序本地化、worker本地化、機器本地化、機架本地化的優先級順序,為每個taskSet提供可用的cpu核數,看是否滿足 
預設一個task需要一個cpu,設定參數為"spark.task.cpus=1"

為taskSet配置設定資源,校驗是否滿足的邏輯,最終在TaskSetManager的resourceOffer(execId, host, maxLocality)方法裡。滿足的話,會生成最終的任務描述,并且調用DAGScheduler的taskStarted(task, info)方法,通知DAGScheduler,這時候每次會觸發DAGScheduler做一次submitMissingStage的嘗試,即stage的tasks都配置設定到了資源的話,馬上會被送出執行

4、statusUpdate(taskId, taskState, data),另一個非常重要的方法,調用者是SchedulerBacnend,用途是SchedulerBacnend會将task執行的狀态彙報給TaskScheduler做一些決定 
若TaskLost,找到該task對應的executor,從active executor裡移除,避免這個executor被配置設定到其他task繼續失敗下去。
task finish包括四種狀态:finished, killed, failed, lost。隻有finished是成功執行完成了。其他三種是失敗。
task成功執行完,調用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否則調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)。TaskResultGetter内部維護了一個線程池,負責異步fetch task執行結果并反序列化。預設開四個線程做這件事,可配參數"spark.resultGetter.threads"=4。
           

補充:TaskResultGetter取task result的邏輯

1、對于success task,如果taskResult裡的資料是直接結果資料,直接把data反序列出來得到結果;如果不是,會調用blockManager.getRemoteBytes(blockId)從遠端擷取。如果遠端取回的資料是空的,那麼會調用TaskScheduler.handleFailedTask,告訴它這個任務是完成了的但是資料是丢失的。否則,取到資料之後會通知BlockManagerMaster移除這個block資訊,調用TaskScheduler.handleSuccessfulTask,告訴它這個任務是執行成功的,并且把result data傳回去。

2、對于failed task,從data裡解析出fail的理由,調用TaskScheduler.handleFailedTask,告訴它這個任務失敗了,理由是什麼。
           

6.6 SchedulerBackend

​ 在TaskScheduler下層,用于對接不同的資源管理系統,SchedulerBackend是個接口,需要實作的主要 方法如下:

def start():Unit

def stop():Unit

// 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler,TaskScheduler根據排程政策配置設定給排隊的任務嗎,傳回一批可執行的任務描述,SchedulerBackend負責launchTask,即最終把task塞到了executor模型上,executor裡的線程池會執行task的run()
def reviveOffers():Unit 

def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException
           

粗粒度:程序常駐的模式,典型代表是Standalone模式,Mesos粗粒度模式,YARN 。

細粒度:Mesos細粒度模式

這裡讨論粗粒度模式,更好了解:CoarseGrainedSchedulerBackend。維護executor相關資訊(包括 executor的位址、通信端口、host、總核數,剩餘核數),手頭上executor有多少被注冊使用了,有多 少剩餘,總共還有多少核是空的等等。

主要職能:

1、	Driver端主要通過actor監聽和處理下面這些事件:

RegisterExecutor(executorId, hostPort, cores, logUrls)
這是executor添加的來源,通常worker拉起、重新開機會觸發executor的注冊。CoarseGrainedSchedulerBackend把這些executor維護起來,更新内部的資源資訊,比如總核數增加。最後調用一次makeOffer(),即把手頭資源丢給TaskScheduler去配置設定一次,傳回任務描述回來,把任務launch起來。這個makeOffer()的調用會出現在任何與資源變化相關的事件中,下面會看到。

StatusUpdate(executorId, taskId, state, data)
task的狀态回調。首先,調用TaskScheduler.statusUpdate上報上去。然後,判斷這個task是否執行結束了,結束了的話把executor上的freeCore加回去,調用一次makeOffer()。

ReviveOffers
這個事件就是别人直接向SchedulerBackend請求資源,直接調用makeOffer()。

KillTask(taskId, executorId, interruptThread)
這個killTask的事件,會被發送給executor的actor,executor會處理KillTask這個事件。

StopExecutors
通知每一個executor,處理StopExecutor事件。

RemoveExecutor(executorId, reason)
從維護資訊中,那這堆executor涉及的資源數減掉,然後調用TaskScheduler.executorLost()方法,通知上層我這邊有一批資源不能用了,你處理下吧。TaskScheduler會繼續把executorLost的事件上報給DAGScheduler,原因是DAGScheduler關心shuffle任務的output location。DAGScheduler會告訴BlockManager這個executor不可用了,移走它,然後把所有的stage的shuffleOutput資訊都周遊一遍,移走這個executor,并且把更新後的shuffleOutput資訊注冊到MapOutputTracker上,最後清理下本地的CachedLocationsMap。

2、reviveOffers()方法的實作。直接調用了makeOffers()方法,得到一批可執行的任務描述,調用launchTasks。

3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。 
周遊每個task描述,序列化成二進制,然後發送給每個對應的executor這個任務資訊 如果這個二進制資訊太大,超過了9.2M(預設的akkaFrameSize 10M減去預設為akka留白的200K),會出錯,abort整個taskSet,并列印提醒增大akka frame size。如果二進制資料大小可接受,發送給executor的actor,處理LaunchTask(serializedTask)事件。
           

6.7 Executor

Executor 是 Spark 裡的程序模型,可以套用到不同的資源管理系統上,與 SchedulerBackend 配合使用。

内部有個線程池,有個 running tasks map,有個 actor,接收上面提到的由 SchedulerBackend 發來 的事件。

Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

事件處理:

launchTask。根據task描述,生成一個TaskRunner線程,丢盡running tasks map裡,用線程池執行這個TaskRunner

killTask。從running tasks map裡拿出線程對象,調它的kill方法。
           

6.8 Spark運作架構特點

1、每個Application擷取專屬的executor程序,該程序在Application期間一直駐留,并以多線程方式運作tasks。這種Application隔離機制有其優勢的,無論是從排程角度看(每個Driver排程它自己的任務),還是從運作角度看(來自不同Application的Task運作在不同的JVM中)。當然,這也意味着Spark Application不能跨應用程式共享資料,除非将資料寫入到外部存儲系統。

2、Spark與資料總管無關,隻要能夠擷取executor程序,并能保持互相通信就可以了。

3、送出SparkContext的Client應該靠近Worker節點(運作Executor的節點),最好是在同一個Rack裡,因為Spark Application運作過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中運作,最好使用RPC将SparkContext送出給叢集,不要遠離Worker運作SparkContext。

4、Task采用了資料本地性和推測執行的優化機制。
           

6.9 Spark任務執行流程分析

6.9.1 Spark任務的任務執行流程圖解

Spark系列之Spark應用程式運作機制第六章 Spark應用程式運作機制

6.9.2 Spark任務的任務執行流程文字描述簡介

送出application到standalone叢集運作的指令

$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop10:7077 \
--driver-memory 512m \
--executor-memory 512m \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.12-3.1.2.jar \
10
           

詳細的執行過程:

--> Spark-submit.sh
--> SparkSubmit.scala
--> main
--> submit
--> doRunMain
--> RunMain
--> 通過反射建立我們編寫的主類的執行個體對象JavaMainApplication,調用main方法
--> 開始執行我們編寫的業務代碼
--> 初始化SparkContext對象 – 最複雜
--> 建立初始的RDD
--> 觸發Action算子
--> 送出job
--> worker執行任務
--> 任務結束
           

6.9.3 Spark任務的任務執行流程文字較長的描述

(1)、将我們編寫的程式打成jar包

(2)、調用spark-submit腳本送出任務到叢集上運作

(3)、運作SparkSubmit類中的main方法,在這個方法中通過反射的方式建立我們編寫的主類的執行個體對象,然後調用main方法,開始執行我們的代碼(注意,我們的spark程式中的driver就運作在SparkSubmit程序中)

(4)、當代碼運作到建立SparkContext對象時,那就開始初始化SparkContext對象了

(5)、在初始化SparkContext對象的時候,會建立三個特别重要的對象,分别是:DAGScheduler 
和TaskScheduler和SchedulersBackend
【DAGScheduler的作用】将RDD的依賴切分成一個一個的stage,然後将stage作為taskSet送出給DriverActor
【TaskScheduler的作用】将接收到的TaskSet進行解析,派發到不同Worker中的Executor中去執行
【SchedulersBackend的作用】負責和Master和Worker進行互動
他們之間的初始化順序是:TaskScheduler > SchedulersBackend > DAGScheduler

(6)、在建構SchedulersBackend的同時,會建立兩個非常重要的對象,舊版本叫做分别是DriverActor和ClientActor,新版本叫做:DriverEndpoint和StandAloneAppClient
【clientActor的作用】向Master注冊使用者送出的任務 
【DriverActor的作用】接受Executor的反向注冊,将任務送出給Executor

(7)、當ClientActor啟動後,會将使用者送出的任務和相關的參數封裝到ApplicationDescription對象中,再封裝在RegisterApplication消息中,然後送出給master進行任務的注冊

(8)、當Master接受到ClientActor送出的任務請求時,會将請求參數進行解析,并封裝成ApplicationInfo消息,然後将其持久化,然後将其加入到任務隊列waitingApps中,然後傳回給ClientActor一個RegisteredApplication消息

(9)、當輪到我們送出的任務運作時,就開始調用schedule(),進行任務資源的排程

(10)、Master将排程好的資源封裝到LaunchExecutor中發送給指定的Worker

(11)、Worker接受到Master發送來的LaunchExecutor時,會将其解壓并封裝到ExecutorRunner中,然後調用這個對象的start(), 啟動Executor

(12)、Executor啟動後會向DriverActor進行反向注冊

(13)、DriverActor會發送注冊成功的消息給Executor

(14)、Executor接受到DriverActor注冊成功的消息後會建立一個線程池,用于執行DriverActor發送過來的task任務

(15)、當屬于這個任務的所有的Executor啟動并反向注冊成功後,就意味着運作這個任務的環境已經準備好了,driver會結束SparkContext對象的初始化,也就意味着new SparkContext這句代碼運作完成

(16)、當初始化sc成功後,driver端就會繼續運作我們編寫的代碼,然後開始建立初始的RDD,然後進行一系列轉換操作,當遇到一個action算子時,也就意味着觸發了一個job

(17)、driver會将這個job送出給DAGScheduler

(18)、DAGScheduler将接受到的job,從最後一個算子向前推導,将DAG依據寬依賴劃分成一個一個的stage,然後将stage封裝成taskSet,并将taskSet中的task送出給TaskScheduler
通過TaskScheduler中的SchedulerBackend中的DriverActor派發任務到Executor節點中

(19)、DriverActor接受到DAGScheduler發送過來的task,會拿到一個序列化器,對task進行序列化,然後将序列化好的task封裝到LaunchTask中,然後将LaunchTask發送給指定的Executor

(20)、Executor接受到了DriverActor發送過來的LaunchTask時,會拿到一個反序列化器,對LaunchTask進行反序列化,封裝到TaskRunner中,然後從Executor中事先初始化好的線程池中擷取一個線程,将反序列化好的任務中的算子作用在RDD對應的分區上
           

聲明:

        文章中代碼及相關語句為自己根據相應了解編寫,文章中出現的相關圖檔為自己實踐中的截圖和相關技術對應的圖檔,若有相關異議,請聯系删除。感謝。轉載請注明出處,感謝。

By luoyepiaoxue2014

微網誌位址: http://weibo.com/luoyepiaoxue2014 點選打開連結

繼續閱讀