天天看點

9. Spark源碼解析之Yarn Cluster模式啟動流程源碼解析

這裡解讀當sparksubmit送出模式為Yarn Cluster模式時的啟動流程。 

SparkSubmit類的runMain()中執行到start()時,本地模式會進入本地送出的--class類的main中開始執行。

// 啟動執行個體
      app.start(childArgs.toArray, sparkConf)
           

 而Yarn Cluster模式,在prepareSubmitEnvironment()中準備運作環境時有判斷過,是以start()其實調用的是org.apache.spark.deploy.yarn.YarnClusterApplication類的start()。

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    // yarn-cluster模式,使用yarn.client作為使用者送出類的包裝執行器
    if (isYarnCluster) {
      // object SparkSubmit中有定義為"org.apache.spark.deploy.yarn.YarnClusterApplication"
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
     
       ...

      }
 
      // 周遊所有args參數,添加到子類參數中
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }
           

YarnClusterApplication

YarnClusterApplication類在org.apache.spark.deploy.yarn.Client類下,其實也就是加載運作環境的資源到運作伺服器本地,然後通過Client類的run()運作。

// 同樣繼承了SparkApplication,重寫了start()
private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    // yarn模式使用緩存來分發jars和檔案,是以移除之前spark的配置
    // 可以回頭看看prepareSubmitEnvironment()運作環境準備,各種部署模式設定相應參數的方法options()
    conf.remove("spark.jars")
    conf.remove("spark.files")

    // 建構client執行個體,而首先又建構了ClientArguments執行個體解析參數
    new Client(new ClientArguments(args), conf).run()
  }

}
           

ClientArguments

就是加載代碼和jars、參數,jar,class,args。

// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String]) {

  var userJar: String = null
  var userClass: String = null
  var primaryPyFile: String = null
  var primaryRFile: String = null
  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

  parseArgs(args.toList)

  // 解析傳入的參數
  private def parseArgs(inputArgs: List[String]): Unit = {
    var args = inputArgs

    while (!args.isEmpty) {
      args match {
        case ("--jar") :: value :: tail =>
          userJar = value
          args = tail

        case ("--class") :: value :: tail =>
          userClass = value
          args = tail

        case ("--primary-py-file") :: value :: tail =>
          primaryPyFile = value
          args = tail

        case ("--primary-r-file") :: value :: tail =>
          primaryRFile = value
          args = tail

        case ("--arg") :: value :: tail =>
          userArgs += value
          args = tail

        case Nil =>

        case _ =>
          throw new IllegalArgumentException(getUsageMessage(args))
      }
    }

    // pyfile和Rfile不能同時設定
    if (primaryPyFile != null && primaryRFile != null) {
      throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
        " at the same time")
    }
  }

  private def getUsageMessage(unknownParam: List[String] = null): String = {
    val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
    message +
      s"""
      |Usage: org.apache.spark.deploy.yarn.Client [options]
      |Options:
      |  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster
      |                           mode)
      |  --class CLASS_NAME       Name of your application's main class (required)
      |  --primary-py-file        A main Python file
      |  --primary-r-file         A main R file
      |  --arg ARG                Argument to be passed to your application's main class.
      |                           Multiple invocations are possible, each will be passed in order.
      """.stripMargin
  }
}
           

Client

直接進入Client的run()。

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf)
  extends Logging {
    ...

      /**
   * Submit an application to the ResourceManager.
   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
   * reporting the application's status until the application has exited for any reason.
   * Otherwise, the client process will exit after submission.
   * If the application finishes with a failed, killed, or undefined status,
   * throw an appropriate SparkException.
   */
  // 向RM送出app
  def run(): Unit = {
    // 送出app擷取id
    // spark.yarn.submit.waitAppCompletion設定為true,程序會儲存存活并報告app狀态,直到app完成
    // 如果fail,kill級undefined狀态退出,會抛出異常
    this.appId = submitApplication()

    // 監控application狀态
    if (!launcherBackend.isConnected() && fireAndForget) {
      val report = getApplicationReport(appId)
      val state = report.getYarnApplicationState
      logInfo(s"Application report for $appId (state: $state)")
      logInfo(formatReportDetails(report))
      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
        throw new SparkException(s"Application $appId finished with status: $state")
      }
    } else { 
      val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
      if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
        diags.foreach { err =>
          logError(s"Application diagnostics message: $err")
        }
        throw new SparkException(s"Application $appId finished with failed status")
      }
      if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
        throw new SparkException(s"Application $appId is killed")
      }
      if (finalState == FinalApplicationStatus.UNDEFINED) {
        throw new SparkException(s"The final status of application $appId is undefined")
      }
    }
  }
}
           

submitApplication()

看看送出app擷取id的過程。

def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      // 初始化launcherBackend,與launcherServer建立連接配接
      launcherBackend.connect()
      // 初始化yarnClinet
      yarnClient.init(hadoopConf)
      // 啟動yarnClient,連接配接到叢集,擷取節點資訊
      yarnClient.start()

      // 輸出節點個數
      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      // 調用接口向RM建立一個app
      val newApp = yarnClient.createApplication()
      // 擷取app請求的響應
      val newAppResponse = newApp.getNewApplicationResponse()
      // 擷取app的id
      appId = newAppResponse.getApplicationId()

      // 建立用戶端,用于與hadoop通訊
      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      // 驗證叢集是否有足夠資源運作AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      // 啟動Container用于啟動AM,并設定環境變量
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      // 送出app,通過appContext擷取資源情況
      yarnClient.submitApplication(appContext)
      // 監控送出的狀況
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      // 傳回appId
      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }
           

一步步解讀上面的過程。

launcherBackend.connect()

launcherBackend是建立了LauncherBackend類的執行個體,這個類主要是用于與launcherServer通訊。

private val launcherBackend = new LauncherBackend() {
    override protected def conf: SparkConf = sparkConf

    override def onStopRequest(): Unit = {
      // 如果傳回的appId為空則kill掉程序
      if (isClusterMode && appId != null) {
        yarnClient.killApplication(appId)
      } else {
        setState(SparkAppHandle.State.KILLED)
        stop()
      }
    }
  }
           

yarnClient.init(hadoopConf)

實際是通過YarnClientImpl.class擷取的YarnClient執行個體。

同樣在Client類中:

private val yarnClient = YarnClient.createYarnClient
           

 YarnClient類:

public abstract class YarnClient extends AbstractService {
  /**
   * Create a new instance of YarnClient.
   */
  @Public
  public static YarnClient createYarnClient() {
    YarnClient client = new YarnClientImpl();
    return client;
  }
    ...
}
           

YarnClientImpl類: 

public class YarnClientImpl extends YarnClient {
    ...
  public YarnClientImpl() {
    super(YarnClientImpl.class.getName());
  }

    ...
}
           

yarnClient.init() 和 start()

 初始化方法就是調用的yarnClient繼承的AbstractService類的init()和start(),主要是對狀态的判斷。

public abstract class AbstractService implements Service {
    ...

  /**
   * {@inheritDoc}
   * This invokes {@link #serviceInit}
   * @param conf the configuration of the service. This must not be null
   * @throws ServiceStateException if the configuration was null,
   * the state change not permitted, or something else went wrong
   */
  @Override
  public void init(Configuration conf) {
    if (conf == null) {
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    }
    // 判斷狀态
    if (isInState(STATE.INITED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.INITED) != STATE.INITED) {
        setConfig(conf);
        try {
          // 初始化
          serviceInit(config);
          if (isInState(STATE.INITED)) {
            //if the service ended up here during init,
            //notify the listeners
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  /**
   * {@inheritDoc}
   * @throws ServiceStateException if the current service state does not permit
   * this action
   */
  @Override
  public void start() {
    if (isInState(STATE.STARTED)) {
      return;
    }
    //enter the started state
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
          startTime = System.currentTimeMillis();
          // 啟動
          serviceStart();
          if (isInState(STATE.STARTED)) {
            //if the service started (and isn't now in a later state), notify
            if (LOG.isDebugEnabled()) {
              LOG.debug("Service " + getName() + " is started");
            }
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

    ...
}
           

yarnClient.getYarnClusterMetrics.getNumNodeManagers 

擷取節點數量

public abstract class YarnClient extends AbstractService {
    ...

  /**
   * <p>
   * Get metrics ({@link YarnClusterMetrics}) about the cluster.
   * </p>
   * 
   * @return cluster metrics
   * @throws YarnException
   * @throws IOException
   */
  public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
      IOException;

    ...
}
           

 getNumNodeManagers 

這個Yarn節點數量在初始化Yarn叢集時就已經通過Metric測量系統擷取,這個後續再解讀。

/**
 * <p><code>YarnClusterMetrics</code> represents cluster metrics.</p>
 * 
 * <p>Currently only number of <code>NodeManager</code>s is provided.</p>
 */
@Public
@Stable
public abstract class YarnClusterMetrics {
  
  @Private
  @Unstable
  public static YarnClusterMetrics newInstance(int numNodeManagers) {
    YarnClusterMetrics metrics = Records.newRecord(YarnClusterMetrics.class);
    metrics.setNumNodeManagers(numNodeManagers);
    return metrics;
  }

  /**
   * Get the number of <code>NodeManager</code>s in the cluster.
   * @return number of <code>NodeManager</code>s in the cluster
   */
  @Public
  @Stable
  public abstract int getNumNodeManagers();

  @Private
  @Unstable
  public abstract void setNumNodeManagers(int numNodeManagers);

}
           

 傳回Client中繼續往下,送出app到RM

      // 調用接口向RM建立一個app
      val newApp = yarnClient.createApplication()
      // 擷取app請求的響應
      val newAppResponse = newApp.getNewApplicationResponse()
      // 擷取app的id
      appId = newAppResponse.getApplicationId()
           

yarnClient.createApplication(),在YarnClient類下。

public abstract YarnClientApplication createApplication()
      throws YarnException, IOException;
           

 YarnClientApplication 

主要是app的上下文資訊。

public  class YarnClientApplication {
  private final GetNewApplicationResponse newAppResponse;
  private final ApplicationSubmissionContext appSubmissionContext;

  public YarnClientApplication(GetNewApplicationResponse newAppResponse,
                               ApplicationSubmissionContext appContext) {
    this.newAppResponse = newAppResponse;
    this.appSubmissionContext = appContext;
  }

  public GetNewApplicationResponse getNewApplicationResponse() {
    return newAppResponse;
  }

  public ApplicationSubmissionContext getApplicationSubmissionContext() {
    return appSubmissionContext;
  }
}
           

GetNewApplicationResponse 

在這裡getApplicationId擷取appId。

public abstract class GetNewApplicationResponse {

  @Private
  @Unstable
  public static GetNewApplicationResponse newInstance(
      ApplicationId applicationId, Resource minCapability,
      Resource maxCapability) {
    GetNewApplicationResponse response =
        Records.newRecord(GetNewApplicationResponse.class);
    response.setApplicationId(applicationId);
    response.setMaximumResourceCapability(maxCapability);
    return response;
  }

  /**
   * Get the <em>new</em> <code>ApplicationId</code> allocated by the 
   * <code>ResourceManager</code>.
   * @return <em>new</em> <code>ApplicationId</code> allocated by the 
   *          <code>ResourceManager</code>
   */
  @Public
  @Stable
  // 擷取appId
  public abstract ApplicationId getApplicationId();

  @Private
  @Unstable
  public abstract void setApplicationId(ApplicationId applicationId);

  /**
   * Get the maximum capability for any {@link Resource} allocated by the 
   * <code>ResourceManager</code> in the cluster.
   * @return maximum capability of allocated resources in the cluster
   */
  @Public
  @Stable
  public abstract Resource getMaximumResourceCapability();
  
  @Private
  @Unstable
  public abstract void setMaximumResourceCapability(Resource capability); 
}
           

ApplicationId 

public abstract class ApplicationId implements Comparable<ApplicationId> {

  @Private
  @Unstable
  public static final String appIdStrPrefix = "application_";

  @Private
  @Unstable
  public static ApplicationId newInstance(long clusterTimestamp, int id) {
    ApplicationId appId = Records.newRecord(ApplicationId.class);
    appId.setClusterTimestamp(clusterTimestamp);
    appId.setId(id);
    appId.build();
    return appId;
  }
    ...
}
           

繼續傳回Client

// 建立用戶端,用于與hadoop通訊
      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()
           
/**
 * An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
 * constructed by parameters passed in.
 * When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM
 * audit log and hdfs-audit.log. That can help users to better diagnose and understand how
 * specific applications impacting parts of the Hadoop system and potential problems they may be
 * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
 * very helpful to track which upper level job issues it.
 *
 * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
 *
 * The parameters below are optional:
 * @param upstreamCallerContext caller context the upstream application passes in
 * @param appId id of the app this task belongs to
 * @param appAttemptId attempt id of the app this task belongs to
 * @param jobId id of the job this task belongs to
 * @param stageId id of the stage this task belongs to
 * @param stageAttemptId attempt id of the stage this task belongs to
 * @param taskId task id
 * @param taskAttemptNumber task attempt id
 */
private[spark] class CallerContext(
  from: String,
  upstreamCallerContext: Option[String] = None,
  appId: Option[String] = None,
  appAttemptId: Option[String] = None,
  jobId: Option[Int] = None,
  stageId: Option[Int] = None,
  stageAttemptId: Option[Int] = None,
  taskId: Option[Long] = None,
  taskAttemptNumber: Option[Int] = None) extends Logging {

  private val context = prepareContext("SPARK_" +
    from +
    appId.map("_" + _).getOrElse("") +
    appAttemptId.map("_" + _).getOrElse("") +
    jobId.map("_JId_" + _).getOrElse("") +
    stageId.map("_SId_" + _).getOrElse("") +
    stageAttemptId.map("_" + _).getOrElse("") +
    taskId.map("_TId_" + _).getOrElse("") +
    taskAttemptNumber.map("_" + _).getOrElse("") +
    upstreamCallerContext.map("_" + _).getOrElse(""))

  private def prepareContext(context: String): String = {
    // The default max size of Hadoop caller context is 128
    lazy val len = SparkHadoopUtil.get.conf.getInt("hadoop.caller.context.max.size", 128)
    if (context == null || context.length <= len) {
      context
    } else {
      val finalContext = context.substring(0, len)
      logWarning(s"Truncated Spark caller context from $context to $finalContext")
      finalContext
    }
  }

  /**
   * Set up the caller context [[context]] by invoking Hadoop CallerContext API of
   * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
   */
  def setCurrentContext(): Unit = {
    if (CallerContext.callerContextSupported) {
      try {
        val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
        val builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
        val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
        val hdfsContext = builder.getMethod("build").invoke(builderInst)
        callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
      } catch {
        case NonFatal(e) =>
          logWarning("Fail to set Spark caller context", e)
      }
    }
  }
}
           

往下,同樣在Client中

// 驗證叢集是否有足夠資源運作AM
      verifyClusterResources(newAppResponse)
           
/**
   * Fail fast if we have requested more resources per container than is available in the cluster.
   */
  private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {

    // 最大記憶體
    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    logInfo("Verifying our application has not requested more than the maximum " +
      s"memory capability of the cluster ($maxMem MB per container)")

    // executor的記憶體
    val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
    if (executorMem > maxMem) {
      throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
        s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
        s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
        s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
    }

    // AM需要的記憶體
    val amMem = amMemory + amMemoryOverhead
    if (amMem > maxMem) {
      throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
        "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
        "'yarn.nodemanager.resource.memory-mb'.")
    }
    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
      amMem,
      amMemoryOverhead))

    // We could add checks to make sure the entire cluster has enough resources but that involves
    // getting all the node reports and computing ourselves.
  }
           
executorMemory               spark.executor.memory  預設1g
executorMemoryOverhead       max(384M,0.07*spark.executor.memoryOverhead)
amMemory          yarn-cluster模式,由driver決定,spark.driver.memory 預設1g 
                  yarn-client模式,spark.yarn.am.memory 預設1g
amMemoryOverhead  yarn-cluster模式,由driver決定, max(384M,0.07*spark.driver.memory)
                  yarn-client模式,spark.yarn.am.memoryOverhead,max(384M,0.07*spark.yarn.am.memoryOverhead)
           

containerContext 

// 啟動Container用于啟動AM,并設定環境變量
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)
           

createContainerLaunchContext() 

/**
   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
   * This sets up the launch environment, java options, and the command for launching the AM.
   */
  private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")
    val appId = newAppResponse.getApplicationId
    val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
    val pySparkArchives =
      if (sparkConf.get(IS_PYTHON_APP)) {
        findPySparkArchives()
      } else {
        Nil
      }

    // 加載環境變量
    val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
    // 加載資源
    val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)

    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
    amContainer.setLocalResources(localResources.asJava)
    amContainer.setEnvironment(launchEnv.asJava)

    val javaOpts = ListBuffer[String]()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None

    // Add Xmx for AM memory
    javaOpts += "-Xmx" + amMemory + "m"

    val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
    javaOpts += "-Djava.io.tmpdir=" + tmpDir

    // TODO: Remove once cpuset version is pushed out.
    // The context is, default gc for server class machines ends up using all cores to do gc -
    // hence if there are multiple containers in same node, Spark GC affects all other containers'
    // performance (which can be that of other Spark containers)
    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
    // of cores on a node.
    // 設定AM的JVM記憶體和運作參數
    // SPARK_USE_CONC_INCR_GC,是否使用CMS,預設不啟用
    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
    if (useConcurrentAndIncrementalGC) {
      // In our expts, using (default) throughput collector has severe perf ramifications in
      // multi-tenant machines
      javaOpts += "-XX:+UseConcMarkSweepGC"
      javaOpts += "-XX:MaxTenuringThreshold=31"
      javaOpts += "-XX:SurvivorRatio=8"
      javaOpts += "-XX:+CMSIncrementalMode"
      javaOpts += "-XX:+CMSIncrementalPacing"
      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
    }

    // Include driver-specific java options if we are launching a driver
    // driver的運作參數
    if (isClusterMode) {
      sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
        javaOpts ++= Utils.splitCommandString(opts)
          .map(Utils.substituteAppId(_, appId.toString))
          .map(YarnSparkHadoopUtil.escapeForShell)
      }
      val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
        sys.props.get("spark.driver.libraryPath")).flatten
      if (libraryPaths.nonEmpty) {
        prefixEnv = Some(createLibraryPathPrefix(libraryPaths.mkString(File.pathSeparator),
          sparkConf))
      }
      if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
        logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
      }
    } else {
      // Validate and include yarn am specific java options in yarn-client mode.
      sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
        if (opts.contains("-Dspark")) {
          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')."
          throw new SparkException(msg)
        }
        if (opts.contains("-Xmx")) {
          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " +
            s"(was '$opts'). Use spark.yarn.am.memory instead."
          throw new SparkException(msg)
        }
        javaOpts ++= Utils.splitCommandString(opts)
          .map(Utils.substituteAppId(_, appId.toString))
          .map(YarnSparkHadoopUtil.escapeForShell)
      }
      sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
        prefixEnv = Some(createLibraryPathPrefix(paths, sparkConf))
      }
    }

    // For log4j configuration to reference
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    val userClass =
      if (isClusterMode) {
        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
      } else {
        Nil
      }
    val userJar =
      if (args.userJar != null) {
        Seq("--jar", args.userJar)
      } else {
        Nil
      }
    val primaryPyFile =
      if (isClusterMode && args.primaryPyFile != null) {
        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
      } else {
        Nil
      }
    val primaryRFile =
      if (args.primaryRFile != null) {
        Seq("--primary-r-file", args.primaryRFile)
      } else {
        Nil
      }
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
    }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }

    // AM的所有參數
    val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))

    // Command for the ApplicationMaster
    // 建構ApplicationMaster的指令
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)

    logDebug("===============================================================================")
    logDebug("YARN AM launch context:")
    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
    logDebug("    env:")
    if (log.isDebugEnabled) {
      Utils.redact(sparkConf, launchEnv.toSeq).foreach { case (k, v) =>
        logDebug(s"        $k -> $v")
      }
    }
    logDebug("    resources:")
    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
    logDebug("    command:")
    logDebug(s"        ${printableCommands.mkString(" ")}")
    logDebug("===============================================================================")

    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    amContainer
  }
           

 createApplicationSubmissionContext()

設定AM的上下文

/**
   * Set up the context for submitting our ApplicationMaster.
   * This uses the YarnClientApplication not available in the Yarn alpha API.
   */
  def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
    appContext.setQueue(sparkConf.get(QUEUE_NAME))
    appContext.setAMContainerSpec(containerContext)
    appContext.setApplicationType("SPARK")

    sparkConf.get(APPLICATION_TAGS).foreach { tags =>
      appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
    }
    sparkConf.get(MAX_APP_ATTEMPTS) match {
      case Some(v) => appContext.setMaxAppAttempts(v)
      case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
          "Cluster's default value will be used.")
    }

    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
      appContext.setAttemptFailuresValidityInterval(interval)
    }

    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(amMemory + amMemoryOverhead)
    capability.setVirtualCores(amCores)

    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
      case Some(expr) =>
        val amRequest = Records.newRecord(classOf[ResourceRequest])
        amRequest.setResourceName(ResourceRequest.ANY)
        amRequest.setPriority(Priority.newInstance(0))
        amRequest.setCapability(capability)
        amRequest.setNumContainers(1)
        amRequest.setNodeLabelExpression(expr)
        appContext.setAMContainerResourceRequest(amRequest)
      case None =>
        appContext.setResource(capability)
    }

    sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
      try {
        val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])

        // These two methods were added in Hadoop 2.6.4, so we still need to use reflection to
        // avoid compile error when building against Hadoop 2.6.0 ~ 2.6.3.
        val setRolledLogsIncludePatternMethod =
          logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
        setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)

        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
          val setRolledLogsExcludePatternMethod =
            logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
          setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
        }

        appContext.setLogAggregationContext(logAggregationContext)
      } catch {
        case NonFatal(e) =>
          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
            "does not support it", e)
      }
    }

    appContext
  }
           

submitApplication()

送出app,通過appContext擷取資源情況

/**
   * <p>
   * Submit a new application to <code>YARN.</code> It is a blocking call - it
   * will not return {@link ApplicationId} until the submitted application is
   * submitted successfully and accepted by the ResourceManager.
   * </p>
   * 
   * <p>
   * Users should provide an {@link ApplicationId} as part of the parameter
   * {@link ApplicationSubmissionContext} when submitting a new application,
   * otherwise it will throw the {@link ApplicationIdNotProvidedException}.
   * </p>
   *
   * <p>This internally calls {@link ApplicationClientProtocol#submitApplication
   * (SubmitApplicationRequest)}, and after that, it internally invokes
   * {@link ApplicationClientProtocol#getApplicationReport
   * (GetApplicationReportRequest)} and waits till it can make sure that the
   * application gets properly submitted. If RM fails over or RM restart
   * happens before ResourceManager saves the application's state,
   * {@link ApplicationClientProtocol
   * #getApplicationReport(GetApplicationReportRequest)} will throw
   * the {@link ApplicationNotFoundException}. This API automatically resubmits
   * the application with the same {@link ApplicationSubmissionContext} when it
   * catches the {@link ApplicationNotFoundException}</p>
   *
   * @param appContext
   *          {@link ApplicationSubmissionContext} containing all the details
   *          needed to submit a new application
   * @return {@link ApplicationId} of the accepted application
   * @throws YarnException
   * @throws IOException
   * @see #createApplication()
   */
  public abstract ApplicationId submitApplication(
      ApplicationSubmissionContext appContext) throws YarnException,
      IOException;
           

launcherBackend.setAppId(appId.toString)

private val launcherBackend = new LauncherBackend() {
    override protected def conf: SparkConf = sparkConf

    override def onStopRequest(): Unit = {
      if (isClusterMode && appId != null) {
        yarnClient.killApplication(appId)
      } else {
        setState(SparkAppHandle.State.KILLED)
        stop()
      }
    }
  }
           

 LauncherBackend

/**
 * A class that can be used to talk to a launcher server. Users should extend this class to
 * provide implementation for the abstract methods.
 *
 * See `LauncherServer` for an explanation of how launcher communication works.
 */
private[spark] abstract class LauncherBackend {

  private var clientThread: Thread = _
  private var connection: BackendConnection = _
  private var lastState: SparkAppHandle.State = _
  @volatile private var _isConnected = false

  protected def conf: SparkConf

  def connect(): Unit = {
    val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
      .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
      .map(_.toInt)
    val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
      .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
    if (port != None && secret != None) {
      val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
      connection = new BackendConnection(s)
      connection.send(new Hello(secret.get, SPARK_VERSION))
      clientThread = LauncherBackend.threadFactory.newThread(connection)
      clientThread.start()
      _isConnected = true
    }
  }

  def close(): Unit = {
    if (connection != null) {
      try {
        connection.close()
      } finally {
        if (clientThread != null) {
          clientThread.join()
        }
      }
    }
  }

  def setAppId(appId: String): Unit = {
    if (connection != null && isConnected) {
      connection.send(new SetAppId(appId))
    }
  }

  def setState(state: SparkAppHandle.State): Unit = {
    if (connection != null && isConnected && lastState != state) {
      connection.send(new SetState(state))
      lastState = state
    }
  }

  /** Return whether the launcher handle is still connected to this backend. */
  def isConnected(): Boolean = _isConnected

  /**
   * Implementations should provide this method, which should try to stop the application
   * as gracefully as possible.
   */
  protected def onStopRequest(): Unit

  /**
   * Callback for when the launcher handle disconnects from this backend.
   */
  protected def onDisconnected() : Unit = { }

  private def fireStopRequest(): Unit = {
    val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
      override def run(): Unit = Utils.tryLogNonFatalError {
        onStopRequest()
      }
    })
    thread.start()
  }

  private class BackendConnection(s: Socket) extends LauncherConnection(s) {

    override protected def handle(m: Message): Unit = m match {
      case _: Stop =>
        fireStopRequest()

      case _ =>
        throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
    }

    override def close(): Unit = {
      try {
        _isConnected = false
        super.close()
      } finally {
        onDisconnected()
      }
    }

  }

}

private object LauncherBackend {

  val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend")

}
           

reportLauncherState(SparkAppHandle.State.SUBMITTED)

報告執行情況

def reportLauncherState(state: SparkAppHandle.State): Unit = {
    launcherBackend.setState(state)
  }
           

繼續閱讀