天天看點

Stage劃分和Task最佳位置算法

1.Stage劃分算法解密

Spark Application中可以因為不同的Action觸發衆多的Job,也就是說一個Application鐘可以有很多的Job,每個Job是由一個或者多個Stage構成的,後面的Stage依賴于前面的Stage,也就是說隻有前面依賴的Stage計算完畢後,後面的Stage才會運作

2.Stage劃分的已經就是寬依賴,什麼産生寬依賴?例如reduceByKey,groupByKey等等

3.由Action(例如Collect)導緻了SparkContext.runJob的執行,最終導緻了DAGScheduler中的submitJob的執行,其核心是通過發送一個case class JobSubmitted對象給eventProcessLoop,其中JobSubmitted代碼如下:

private[scheculer] case class JobSubmitted(
jobId:Int,
finalRDD:RDD[_]
func:(TaskContext,Iterator[_]=>_,
partitions:Array[Int],
callSite:CallSite,
listener:JobListener,
properties:Properties=null)
extends DAGSchedulerEvent
           

eventProcessLoop是DAGSchedulerEventProcessLoop的具體執行個體,而DAGSchedulerEventProcessLoop是EventLoop的子類,具體實作EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive

4.在doOnreceive中通過模式比對的方式把執行路由到

case JobSubmitted(jobId,rdd,func,partitions,callSite,listener,properties)=>.dagScheduler.handleJobSubmitted(jobId,rdd,func,partitons,callSite,listener,properties)
           

5.在handleJobSubmited中首先建立finalStage,建立finalStage時候會建立父Stage的依賴鍊條:

補充說明:所謂的missing就是說要進行目前的計算了

二、Task任務本地性算法實作:

1.在submiMissingTasks中會通過調用以下代碼來獲得任務的本地性:

2.

val taskIdToLocations:Map[Int,Seq[TaskLocation]]=try{
stage match{
case s: ShuffleMapStage =>.
partionsToCompute.map{id,getPreferredLocs(stage.rdd,id))}.toMap
case s:ResultStage =>.
val job=s.activeJob.get
partitionsToCompute.map{id =>
val p =s.partitions(id)
(id,getPreferredLocs(stage.rdd,p))
}.toMap
}
}
           

2.具體一個Partition中的資料本地性的算法實作為下述代碼:

private[spark]
def getPreferredLocs(rdd:RDD[_],partition:Int):Seq[TaskLocation]={getPreferredLocsInternal(rdd,partition,new HashSet)}
           

在具體算法實作的時候首先查詢DAGScheduler的記憶體資料結構中是否存在目前Partition的資料本地性的資訊,如果有的話直接傳回,如果沒有首先會調用rdd.getPreferredLocations

例如想讓Spark運作在HBase上或者一種現在還沒有直接的資料庫上面,此時開發者需要自定義RDD,為了保證Task計算的資料本地性,最為關鍵的方式就是必須實作RDD的getPreferredLocations

3.DAGScheduler計算資料本地性的時候巧妙的借助了RDD自身的getPreferedLocations中的資料,最大化的優化的效率,因為getPreferredLocations中表明了每個Partition的資料本地性,雖然目前Partition可能被persist或者checkpoint,但是persist或者checkpoint預設情況下肯定是和getPreferredLocations中的Patition的資料本地性一緻的,是以這就是極大的簡化Task資料本地性算法的實作和效率的優化。

繼續閱讀