天天看点

spark源码学习(一)---sparkContext(1)

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient
           

一个sparkContext代表了一个连接到spark cluster的一个connection。sparkContext用来创建RDD,accumulators和broadcast变量,每个jvm只有一个sparkContext,这个限制后面可能不再适用。参见SPARK-2243。在我们的程序中通常构造一个sparkConf,然后用该sparkConf来构造SparkContext,任何对sparkconf的设置将会覆盖默认的设置。SparkContext具有多个构造函数.

SparkContext类中最重要的一个函数是createTaskScheduler方法,该方法获得我们提交任务时传入的参数,比如将master 值设为local,则该方法执行case “local”下的代码。方法返回值是(backend,scheduler),scheduler是SchedulerImpl对象,就是所说的TaskScheduler,负责任务的调度。而backend根据不同的运行方式返回的具体类型也不同。比如如果是local模式,则返回一个LocalBackend,如果是yarn-cluster,则返回CoarseGrainedSchedulerBackend。该backend实际上是一个非常重要的一个类,后面会负责注册到master,executor的注册,task发送给executor等,都是由该组件完成。注意我们下面代码红色的部分,在backend初始化后,scheduler回调用scheduler.initialize(backend)这个方法。

private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        <span style="color:#ff0000;">scheduler.initialize(backend)</span>
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case "yarn-standalone" | "yarn-cluster" =>
        if (master == "yarn-standalone") {
          logWarning(
            "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
        }
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
        } catch {
          // TODO: Enumerate the exact reasons why it can fail
          // But irrespective of it, it means we cannot proceed !
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        scheduler.initialize(backend)
        (backend, scheduler)

      case "yarn-client" =>
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        scheduler.initialize(backend)
        (backend, scheduler)

      case MESOS_REGEX(mesosUrl) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
        } else {
          new MesosSchedulerBackend(scheduler, sc, mesosUrl)
        }
        scheduler.initialize(backend)
        (backend, scheduler)

      case SIMR_REGEX(simrUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
        scheduler.initialize(backend)
        (backend, scheduler)

      case zkUrl if zkUrl.startsWith("zk://") =>
        logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
          "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
        createTaskScheduler(sc, "mesos://" + zkUrl)

      case _ =>
        throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
  }
}
           

scheduler的initialize方法将会创建一个调度池。该调度池具有优先策略,如fifo,fair等。下面是TaskSchedulerImpl类的initialize方法:

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
	
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }
           

sparkContext在创建好TaskScheduler后会调用scheduler的start()方法:

// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()
           

下面是TaskSchedulerImpl类的start方法:可以看出该start调用了(SparkDeploySchedulerBackend)backend的start方法。

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
           

backend的start会创建一个Appclient对象,Appclient会创建一个ClientActor,ClientActor会调用registerMaster方法来注册master。APPclient是一个APP连接cluster的接口,需要一个master的URL,一个APP的description和一个event的listener,当各种event出现的时候,会回调listener。除了taskScheduler外还有一个DAGScheduler,DAGScheduler包含了一个DAGSchedulerEventProcessActor,底层基于该组件通信。DAGScheduler在执行createTaskscheduler后被创建。

// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()
           

还有个sparkUI,访问4040端口,会显示应用程序的运行状态。sparkUI的创建基于jetty服务器来提供服务。

总结:

sparkContext包含最重要的三个组件:SchedulerBackend、TaskScheduler、DAGScheduler。还有个sparkUI。