天天看点

2. Spark源码解析之org.apache.spark.launcher.Main源码解析

不管是启动spark-shell,或者通过spark-submit提交jar,还是其他其他master或者worker的脚本,最后都会进入spark-class,并调用launch.main方法构建执行命令。

java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"           

也就是说org.apache.spark.launcher.Main是被spark-class调用,从spark-class接收参数。这个类是提供spark内部脚本调用的工具类,并不是真正的执行入口。它负责调用其他类,对参数进行解析,并生成执行命令,最后将命令返回给spark-class的 exec “${CMD[@]}”执行。

它主要是根据提交的类型spark-submit和spark-class(master、worker、hostoryserver等等),构建对应的命令解析对象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通过buildCommand方法构造执行命令。

大概看一下这时sparksubmit的参数,Master和Worker后续解析:

方式 参数
spark-shell

org.apache.spark.deploy.SparkSubmit

--class org.apache.spark.repl.Main

--name "Spark shell"

spark-submit

--class com.idmapping.scala.WordCount \

--master yarn \

--deploy-mode client \

--driver-memory 4G \

--executor-memory 3G \

--executor-cores 2 \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.default.parallelism=24 \

/user/jars/idmapping-job-1.0-SNAPSHOT.jar \

file:///user/tmp/words.txt file:///user/data/wordcount/

/**
 * Command line interface for the Spark launcher. Used internally by Spark scripts.
 */
//这是提供spark内部脚本使用工具类

class Main {

  /**
   * Usage: Main [class] [class args]
   * <p>
   * 分为spark-submit和spark-class两种模式
   * 但提交的是class类的话,会包含其他如:master/worker/history等等
   * This CLI works in two different modes:
   * <ul>
   *   <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
   *   {@link SparkLauncher} class is used to launch a Spark application.</li>
   *   <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
   * </ul>
   *
   * 类unix系统的输出的参数是集合,而windows参数是空格分隔
   * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
   * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
   * <p>
   * On Unix-like systems, the output is a list of command arguments, separated by the NULL
   * character. On Windows, the output is a command line suitable for direct execution from the
   * script.
   */

  /**
   * main这个类主要是解析参数,把需要的参数放到执行对象中
   * 如果是直接启动spark-shell调用spark-class传入的参数:
   * org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" 
   * --master spark://host:7077
   */
  public static void main(String[] argsArray) throws Exception {

    //判断参数列表
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

  /** 
   * 将参数列表放入args集合中
   * 移出第一个参数赋值给classname,即执行程序。剩余参数为:
   * --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
   */
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);

    // 判断是否打印执行信息
    // 创建命令解析器
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;
    
    // 构建执行程序对象:spark-submit/spark-class
    // 把参数都取出并解析,放入执行程序对象中
    // 意思是,submit还是master和worker等程序在这里拆分,并获取对应的执行参数
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {

      // submit的判断及错误提示,帮助信息
      try {
        // 构建spark-submit命令对象
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        // 类名解析--class org.apache.spark.repl.Main
        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        // 帮助信息
        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        // 构建spark-submit帮助信息对象
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
      // 构建spark-class命令对象
      // 主要是在这个类里解析了命令对象和参数
      builder = new SparkClassCommandBuilder(className, args);
    }

    // 这里才真正构建了执行命令
    // 调用了SparkClassCommandBuilder的buildCommand方法
    // 把执行参数解析成了k/v格式
    Map<String, String> env = new HashMap<>();
    List<String> cmd = builder.buildCommand(env);
    if (printLaunchCommand) {
      System.err.println("Spark Command: " + join(" ", cmd));
      System.err.println("========================================");
    }

    // 是windows环境的话,不解析成k/v,而是用空格分隔
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      // In bash, use NULL as the arg separator since it cannot be used in an argument.
      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }


  // 以下是windows下的设置
  // 主要是在linux下使用,windows下就不怎么去关注了

  // 在这里构建windows下的执行命令
  private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
    StringBuilder cmdline = new StringBuilder();
    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
      cmdline.append(" && ");
    }
    for (String arg : cmd) {
      cmdline.append(quoteForBatchScript(arg));
      cmdline.append(" ");
    }
    return cmdline.toString();
  }

  /**
   * Prepare the command for execution from a bash script. The final command will have commands to
   * set up any needed environment variables needed by the child process.
   */
  // 为windows下运行bash命令准备必须的环境变量
  private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
    if (childEnv.isEmpty()) {
      return cmd;
    }

    List<String> newCmd = new ArrayList<>();
    newCmd.add("env");

    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
    }
    newCmd.addAll(cmd);
    return newCmd;
  }

  // 当spark-submit提交失败时,这里会再进行一次解析,再不行才会提示用法
  private static class MainClassOptionParser extends SparkSubmitOptionParser {

    String className;
    @Override
    protected boolean handle(String opt, String value) {
      if (CLASS.equals(opt)) {
        className = value;
      }
      return false;
    }
    @Override
    protected boolean handleUnknown(String opt) {
      return false;
    }
    @Override
    protected void handleExtraArgs(List<String> extra) {
    }
  }
}
           

SparkSubmitCommandBuilder

// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
  SparkSubmitCommandBuilder(List<String> args) {
    this.allowsMixedArguments = false;
    this.sparkArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;

    if (args.size() > 0) {
      // 判断是saprkR-sell还是pyspark-shell
      switch (args.get(0)) {
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL:
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;
        
        // 不是pyspark或R,则是example
        case RUN_EXAMPLE:
          isExample = true;
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      OptionParser parser = new OptionParser();
      parser.parse(submitArgs);
      this.isAppResourceReq = parser.isAppResourceReq;
    }  else {
      this.isExample = isExample;
      this.isAppResourceReq = false;
    }
  }           

 OptionParser 继承了 SparkSubmitOptionParser,parse方法是直接调用了SparkSubmitOptionParser的,就是对后面的设置的参数 --name vlue 进行过滤,过滤不符合规则的。

private class OptionParser extends SparkSubmitOptionParser {}           

parse()

// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
  protected final void parse(List<String> args) {
    // 判断参数,匹配以--开头的所有参数,可以是--name="Spark shell"的设置
    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

    // 循环遍历提交的参数
    int idx = 0;
    for (idx = 0; idx < args.size(); idx++) {
      String arg = args.get(idx);
      String value = null;

      // 取出配置项和参数值
      // 如 name为--class,value为"Spark shell"
      Matcher m = eqSeparatedOpt.matcher(arg);
      if (m.matches()) {
        arg = m.group(1);
        value = m.group(2);
      }

      // Look for options with a value.
      // 过滤值为空的
      // 这里会将参数name与opts常量中的类型进行匹配
      String name = findCliOption(arg, opts);
      if (name != null) {
        if (value == null) {
          // 如参数只有--class value,size=2,idx=0,size-1=1,说明值不为空
          // 如果没有值,参数长度size=1,size-1=0,报错
          if (idx == args.size() - 1) {
            throw new IllegalArgumentException(
                String.format("Missing argument for option '%s'.", arg));
          }

          // 值不为空,就把idx下一个值赋给value
          idx++;
          value = args.get(idx);
        }

        // 调用回调方法handle(),将用户提交参数与opts[]中的设置的常量进行匹配,方法在子类中重写
        // 作用就是判断是否为空,空的就直接退出判断,继续下一个参数读取
        if (!handle(name, value)) {
          break;
        }
        continue;
      }

      // Look for a switch.
      // 再次过滤参是否含有--help等参数的
      // 将name与switches常量进行匹配
      name = findCliOption(arg, switches);
      if (name != null) {
        if (!handle(name, null)) {
          break;
        }
        continue;
      }

      // 是否含有未知参数
      if (!handleUnknown(arg)) {
        break;
      }
    }

    if (idx < args.size()) {
      idx++;
    }

    // 多出的参数放到一个集合中,会报出异常
    handleExtraArgs(args.subList(idx, args.size()));
  }           

这里构建命令完成,会返回给spark-class最终调用 exec "${CMD[@]}" 执行。

其实到这里,也就是spark-shell及spark-submit的启动流程。

继续阅读