天天看点

spark源码分析之submit的提交过程(二)

spark源码分析之submit的提交过程(二)

此文接上文spark源码分析之submit的提交过程(一)

  • 5.接下来执行mainMethod.invoke(null, childArgs.toArray)

    也就是执行org.apache.spark.deploy.yarn.Client

    def main(argStrings: Array[String]) {
        if (!sys.props.contains("SPARK_SUBMIT")) {
          logWarning("WARNING: This client is deprecated and will be removed in a " +
            "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
        }
    
        // Set an env variable indicating we are running in YARN mode.
        // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
        System.setProperty("SPARK_YARN_MODE", "true")
        val sparkConf = new SparkConf
        // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
        // so remove them from sparkConf here for yarn mode.
        
        //因为使用的是yarn模式,所以移除spark的配置参数
        sparkConf.remove("spark.jars")
        sparkConf.remove("spark.files")
        val args = new ClientArguments(argStrings)
        new Client(args, sparkConf).run()
      }
               
    • 1.在创建Client类的时候,传入了一些配置参数childArgs.toArray
    • 2.然后在main方法中,对这些参数又做了一些封装

      ​ userClass ==> --class ==>WordCount

      • 我们来看一下ClientArguements(argStrings)
        package org.apache.spark.deploy.yarn
        
        import scala.collection.mutable.ArrayBuffer
        
        // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
        private[spark] class ClientArguments(args: Array[String]) {
        
          var userJar: String = null
          var userClass: String = null
          var primaryPyFile: String = null
          var primaryRFile: String = null
          var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
        
          parseArgs(args.toList)
        
          private def parseArgs(inputArgs: List[String]): Unit = {
            var args = inputArgs
        
            while (!args.isEmpty) {
              args match {
                case ("--jar") :: value :: tail =>
                  userJar = value
                  args = tail
        
                case ("--class") :: value :: tail =>
                  userClass = value
                  args = tail
        
                case ("--primary-py-file") :: value :: tail =>
                  primaryPyFile = value
                  args = tail
        
                case ("--primary-r-file") :: value :: tail =>
                  primaryRFile = value
                  args = tail
        
                case ("--arg") :: value :: tail =>
                  userArgs += value
                  args = tail
        
                case Nil =>
        
                case _ =>
                  throw new IllegalArgumentException(getUsageMessage(args))
              }
            }
        
            if (primaryPyFile != null && primaryRFile != null) {
              throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
                " at the same time")
            }
          }
        
          private def getUsageMessage(unknownParam: List[String] = null): String = {
            val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
            message +
              s"""
              |Usage: org.apache.spark.deploy.yarn.Client [options]
              |Options:
              |  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster
              |                           mode)
              |  --class CLASS_NAME       Name of your application's main class (required)
              |  --primary-py-file        A main Python file
              |  --primary-r-file         A main R file
              |  --arg ARG                Argument to be passed to your application's main class.
              |                           Multiple invocations are possible, each will be passed in order.
              """.stripMargin
          }
        }
                   
    • 3.创建了一个Client对象(这个类主要功能是创建/配置ApplicationMaster的应用程序,准备相关的环境与资源。)

      我们在创建Client对象的时候,其实我们的底层维护的是yarn客户端(yarnClient既不是一个进程,也不是一个线程,它就是在SparkSubmit进程中创建的一个对象)

      //在Client源码中我们发现 ,创建的其实是yarn客户端
      private val yarnClient = YarnClient.createYarnClient
                 
      • Client源码主要方法分析

        submitApplication()

        将运行ApplicationMaster的应用程序提交到ResourceManager。

        • 稳定的Yarn API提供了一种方便的方法(YarnClient#createApplication)
        • 创建应用程序并设置应用程序提交上下文
        主要逻辑有:
        -  从RM获取新的应用程序
        -  设置应用程序的staging目录:如果配置STAGING_DIR,则使用其值作为staging目录。否则使用用户的home目录。
        -  验证群集是否有足够的资源用于AM
        -  设置适当的上下文以启动我们的AM
        -  最后,提交并监控应用程序
                   

        createApplicationSubmissionContext()

        设置提交ApplicationMaster的上下文。== 主要逻辑如下:

        • 如果是cluster模式,则获取所有与

          spark.yarn.driver.resource

          相关的配置。否则使用

          spark.yarn.am.resource

          相关的配置
        • 获取YarnClientApplication的默认上下文,并在此基础上设置
          • ApplicationName

          • QUEUE_NAME

          • containerContext

          • ApplicationType

          • 所有

            APPLICATION_TAGS

          • MAX_APP_ATTEMPTS

          • 其他配置
        • 设置资源
          • capability.setMemory(amMemory + amMemoryOverhead)

          • capability.setVirtualCores(amCores)

        • 其他配置

        setupSecurityToken()

        ​ 设置安全令牌以启动我们的ApplicationMaster容器。 在客户端模式下,调度程序已获取一组凭据,因此将它们复制并发送到AM。 在群集模式下,获取新凭据,然后将其与当前用户已有的任何凭据一起发送到AM。

        getApplicationReport()

        ​ 从ResourceManager获取我们提交的应用程序的应用程序报告。

        getClientToken()

        ​ 返回此客户端使用的安全令牌以与ApplicationMaster通信。 如果未启用安全性,则报告返回的标记为空。

        verifyClusterResources()

        检查分配的资源是否合理,如果我们请求每个容器的资源多于群集中可用的资源,则会失败。

        主要逻辑:

        val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
            
         val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
         //compare if executorMem > maxMem
         //...
         val amMem = amMemory + amMemoryOverhead
         //compare if amMem > maxMem
         //...
                   

        copyFileToRemote()

        ​ 如果需要,将给定资源文件复制到远程文件系统(例如HDFS)。仅当源和目标文件系统不同或源方案为“file”时,才会复制该文件。 用于准备启动ApplicationMaster容器的资源,例如用户其他的其他辅助文件。

        prepareLocalResources()

        ​ 如果需要,将任何资源上载到分布式缓存。 如果要在本地使用资源,请为下游代码设置适当的配置以正确处理它。 这用于为ApplicationMaster设置容器启动上下文。

        远程目录地址

        stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))

        distribute()

        ​ 将文件分发到群集。 如果文件的路径是“local:”URI,它实际上不是分发的,而其他文件将被复制到HDFS(如果还没有)并添加到应用程序的分布式缓存中。

        主要逻辑:

        • Keytab文件
          • 如果我们传入keytab,请确保将keytab表复制到HDFS上的登台目录,并设置相关的环境变量,以便AM可以再次登录。
        • 配置文件以及jar包 有两个设置可控制要添加到缓存的文件:
          • 如果定义了Spark归档文件,请使用归档文件。 该存档应在其根目录中包含jar文件。
          • 如果提供了jar列表,则过滤非本地jar,解析globs,并将找到的文件添加到缓存中。
          请注意,存档不能是“本地”URI。 如果未找到上述任何设置,则上传$ SPARK_HOME/jars中找到的所有文件。
        • 其他资源 对通过ClientArguments传递的任何其他资源执行相同操作。 每个资源类别由3元组表示: (1)此类别中逗号分隔的资源列表, (2)资源类型, (3)是否将这些资源添加到类路径中
        • python文件 需要特别处理python文件列表。 所有非归档文件都需要放在将添加到PYTHONPATH的子目录中。
        • 更新配置里面的所有分布式文件的列表更新配置(conf存档除外)。 conf存档将由AM以不同方式处理。
        • 手动将conf存档上传到HDFS,并在配置中记录其位置。这将允许AM知道conf存档在HDFS中的位置,以便可以将其分发到容器中。
        • 手动将配置存档添加到缓存管理器,以便在设置正确文件的情况下启动AM。

        createConfArchive()

        ​ 使用配置文件创建存档以进行分发。 这些将由AM和执行者使用。 这些文件被压缩并作为存档添加到作业中,因此YARN会在分发给AM和执行程序时进行解压。 然后将此目录添加到AM和执行程序进程的类路径中,以确保每个人都使用相同的默认配置。 这遵循启动脚本设置的优先顺序,其中HADOOP_CONF_DIR在YARN_CONF_DIR之前的类路径中显示。 存档还包含一些Spark配置。 即,它将SparkConf的内容保存在由AM进程加载的文件中。

        setupLaunchEnv()

        ​ 设置启动ApplicationMaster容器的环境。如

        DRIVER_CLASS_PATH

        ,

        PYTHONPATH

        ,

        PYSPARK_DRIVER_PYTHON

        ,

        PYSPARK_PYTHON

        ,

        PYTHONHASHSEED

        createContainerLaunchContext()

        ​ 设置ContainerLaunchContext以启动我们的ApplicationMaster容器。 这将设置启动环境,java选项以及启动AM的命令。

        monitorApplication()

        ​ 报告应用程序的状态,直到它已成功或由于某些故障退出,然后返回一对纱线应用状态(FINISHED, FAILED, KILLED, or RUNNING)和最终应用状态(FINISHED, FAILED, KILLED, or RUNNING)。

        run()

        ​ 将应用程序提交到ResourceManager。 如果将spark.yarn.submit.waitAppCompletion设置为true,它将保持活动状态,报告应用程序的状态,直到应用程序因任何原因退出。 否则,客户端进程将在提交后退出。 如果应用程序以失败,终止或未定义状态完成,则抛出适当的SparkException。

    • 书回正传,在我们创建了Client对象以后,这个对象也执行了run()方法
  • 6.执行run()方法
    def run(): Unit = {
        //发送请求给yarn服务器,获取AppId(向resourcemanager发送请求)
        this.appId = submitApplication()
        if (!launcherBackend.isConnected() && fireAndForget) {
          val report = getApplicationReport(appId)
          val state = report.getYarnApplicationState
          logInfo(s"Application report for $appId (state: $state)")
          logInfo(formatReportDetails(report))
          if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
            throw new SparkException(s"Application $appId finished with status: $state")
          }
        } else {
          val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
          if (yarnApplicationState == YarnApplicationState.FAILED ||
            finalApplicationStatus == FinalApplicationStatus.FAILED) {
            throw new SparkException(s"Application $appId finished with failed status")
          }
          if (yarnApplicationState == YarnApplicationState.KILLED ||
            finalApplicationStatus == FinalApplicationStatus.KILLED) {
            throw new SparkException(s"Application $appId is killed")
          }
          if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
            throw new SparkException(s"The final status of application $appId is undefined")
          }
        }
      }
               
    • 第一行代码就是向resourcemanager发送请求获取Application_id

      篇幅过长,请看下一篇博文

继续阅读