天天看點

我的Spark源碼核心SparkContext走讀全紀錄

我的Spark源碼核心SparkContext走讀全紀錄

Dirver Program(SparkConf)  package org.apache.spark.SparkConf

Master        package org.apache.spark.deploy.master

SparkContext  package org.apache.spark.SparkContext

Stage         package org.apache.spark.scheduler.Stage

Task          package org.apache.spark.scheduler.Task  

DAGScheduler  package org.apache.spark.scheduler   

TaskScheduler package org.apache.spark.scheduler.TaskScheduler

TaskSchedulerImpl  package org.apache.spark.scheduler

Worker        package org.apache.spark.deploy.worker

Executor      package org.apache.spark.executor

BlockManager  package org.apache.spark.storage

TaskSet       package org.apache.spark.scheduler

//初始化後開始建立

// Create and start the scheduler

    val (sched, ts) = SparkContext.createTaskScheduler(this, master)

    _schedulerBackend = sched

    _taskScheduler = ts

    _dagScheduler = new DAGScheduler(this)

    _heartbeatReceiver.send(TaskSchedulerIsSet)

/**

   * Create a task scheduler based on a given master URL.

   * Return a 2-tuple of the scheduler backend and the task scheduler.

   */

  private def createTaskScheduler(

      sc: SparkContext,

      master: String): (SchedulerBackend, TaskScheduler) = {

master match {

      case "local" =>

執行個體化一個

val scheduler = new TaskSchedulerImpl(sc)

建構masterUrls:

val masterUrls = localCluster.start()

據說是非常關鍵的backend:

val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)

        scheduler.initialize(backend)

        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {

          localCluster.stop()

        }

繼續閱讀