天天看點

Spark DAG源碼剖析_2 | 學習筆記

開發者學堂課程【大資料實時計算架構Spark快速入門:Spark DAG源碼剖析_2】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1674

Spark DAG源碼剖析_2

//First figure out the indexes of indexes of partition ids to compute.

Val partitionsToCompute:Seq[Int] = stage.findMissingpartitions()

Val taskIdToLocations:Map[Int,Seq[TaskLocation]] = try {

 Stage match {

Case s: ShuffleMapStage =>

 partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd,id))}.toMap

Case s: ResultStage =>

 Val job = s.activeJob.get

 partitionsToCompute.map ( id =>

   Val p = s.partitions(id)

   (id,getPreferredLocs(stage.rdd,p))

 }.toMap

}

}

rdd在上圖Stage2中代表的是F表格

// If the partition is cached,return the cache locations

Val cached  “ getCacheLocs(rdd)(partition)

If (cached.nonEmpty) {

 Return cached

}

看rdd有沒有自動返存,如果rdd和partition有自動返存直接返存到原來的位置,如果沒有自動返存的話:

// If the RDD has some placement pceferences(as is the case for input RDDs),get those

Val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

If (rddPrefs.nonEmpty) {

 Return rddPrefs.map(TaskLocation(_))

}

·Get the array of partitions of this RDD,taking into account whether the

·RDD is checkpointed or not.

·/

Final def partitions:Array[Partition] = {

 checkpointRDD.map(_.partitions).getorElse {

  If (partitions_== null) {

Partitions_= getpartitions

  }

 Partitions_

}

}

Protected def getInputFormat(conf:Jobconf):InputFormat{K,V} = {

Val newInputFormat = Reflect:onutils,newInstance(inputFormatClass,asInstanceOf[class[_]],conf).asInstanceOf{InputFormat[K,V]]

newInputFormat match {

 Cass c:configurable => c.setconf(conf)

 Cass_=>

}

newInputFormat

}

Override def getPartitions:Array[Partition] = {

Val jobconf = getJobconf()

//add the credentials here as this can be called before SparkContext initiallzed

SparkHadooputil.get.addcredentials(jobconf)

Val inputFormat = getInputFormat(jobconf)

Val inputSplits = inputFormat.getSplits(jobconf,minPartitions)

Val array = new Array[Partition](inputSplits.size)

For(i <- 0 until inputSplits.size) {

Arry(i) = new  HadoopPartition(id,i,inputSplits(i))

}

Array

}

Def textFile

hadoopFile(path,classof[TextInputFormat],classof[LongWritable],classof[Text]

hadoopFile方法傳回的是new HadoopRDD

繼續閱讀