天天看點

2. Spark源碼解析之org.apache.spark.launcher.Main源碼解析

不管是啟動spark-shell,或者通過spark-submit送出jar,還是其他其他master或者worker的腳本,最後都會進入spark-class,并調用launch.main方法建構執行指令。

java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"           

也就是說org.apache.spark.launcher.Main是被spark-class調用,從spark-class接收參數。這個類是提供spark内部腳本調用的工具類,并不是真正的執行入口。它負責調用其他類,對參數進行解析,并生成執行指令,最後将指令傳回給spark-class的 exec “${CMD[@]}”執行。

它主要是根據送出的類型spark-submit和spark-class(master、worker、hostoryserver等等),建構對應的指令解析對象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通過buildCommand方法構造執行指令。

大概看一下這時sparksubmit的參數,Master和Worker後續解析:

方式 參數
spark-shell

org.apache.spark.deploy.SparkSubmit

--class org.apache.spark.repl.Main

--name "Spark shell"

spark-submit

--class com.idmapping.scala.WordCount \

--master yarn \

--deploy-mode client \

--driver-memory 4G \

--executor-memory 3G \

--executor-cores 2 \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.default.parallelism=24 \

/user/jars/idmapping-job-1.0-SNAPSHOT.jar \

file:///user/tmp/words.txt file:///user/data/wordcount/

/**
 * Command line interface for the Spark launcher. Used internally by Spark scripts.
 */
//這是提供spark内部腳本使用工具類

class Main {

  /**
   * Usage: Main [class] [class args]
   * <p>
   * 分為spark-submit和spark-class兩種模式
   * 但送出的是class類的話,會包含其他如:master/worker/history等等
   * This CLI works in two different modes:
   * <ul>
   *   <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
   *   {@link SparkLauncher} class is used to launch a Spark application.</li>
   *   <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
   * </ul>
   *
   * 類unix系統的輸出的參數是集合,而windows參數是空格分隔
   * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
   * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
   * <p>
   * On Unix-like systems, the output is a list of command arguments, separated by the NULL
   * character. On Windows, the output is a command line suitable for direct execution from the
   * script.
   */

  /**
   * main這個類主要是解析參數,把需要的參數放到執行對象中
   * 如果是直接啟動spark-shell調用spark-class傳入的參數:
   * org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" 
   * --master spark://host:7077
   */
  public static void main(String[] argsArray) throws Exception {

    //判斷參數清單
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

  /** 
   * 将參數清單放入args集合中
   * 移出第一個參數指派給classname,即執行程式。剩餘參數為:
   * --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
   */
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);

    // 判斷是否列印執行資訊
    // 建立指令解析器
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;
    
    // 建構執行程式對象:spark-submit/spark-class
    // 把參數都取出并解析,放入執行程式對象中
    // 意思是,submit還是master和worker等程式在這裡拆分,并擷取對應的執行參數
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {

      // submit的判斷及錯誤提示,幫助資訊
      try {
        // 建構spark-submit指令對象
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        // 類名解析--class org.apache.spark.repl.Main
        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        // 幫助資訊
        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        // 建構spark-submit幫助資訊對象
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
      // 建構spark-class指令對象
      // 主要是在這個類裡解析了指令對象和參數
      builder = new SparkClassCommandBuilder(className, args);
    }

    // 這裡才真正建構了執行指令
    // 調用了SparkClassCommandBuilder的buildCommand方法
    // 把執行參數解析成了k/v格式
    Map<String, String> env = new HashMap<>();
    List<String> cmd = builder.buildCommand(env);
    if (printLaunchCommand) {
      System.err.println("Spark Command: " + join(" ", cmd));
      System.err.println("========================================");
    }

    // 是windows環境的話,不解析成k/v,而是用空格分隔
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      // In bash, use NULL as the arg separator since it cannot be used in an argument.
      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }


  // 以下是windows下的設定
  // 主要是在linux下使用,windows下就不怎麼去關注了

  // 在這裡建構windows下的執行指令
  private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
    StringBuilder cmdline = new StringBuilder();
    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
      cmdline.append(" && ");
    }
    for (String arg : cmd) {
      cmdline.append(quoteForBatchScript(arg));
      cmdline.append(" ");
    }
    return cmdline.toString();
  }

  /**
   * Prepare the command for execution from a bash script. The final command will have commands to
   * set up any needed environment variables needed by the child process.
   */
  // 為windows下運作bash指令準備必須的環境變量
  private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
    if (childEnv.isEmpty()) {
      return cmd;
    }

    List<String> newCmd = new ArrayList<>();
    newCmd.add("env");

    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
    }
    newCmd.addAll(cmd);
    return newCmd;
  }

  // 當spark-submit送出失敗時,這裡會再進行一次解析,再不行才會提示用法
  private static class MainClassOptionParser extends SparkSubmitOptionParser {

    String className;
    @Override
    protected boolean handle(String opt, String value) {
      if (CLASS.equals(opt)) {
        className = value;
      }
      return false;
    }
    @Override
    protected boolean handleUnknown(String opt) {
      return false;
    }
    @Override
    protected void handleExtraArgs(List<String> extra) {
    }
  }
}
           

SparkSubmitCommandBuilder

// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
  SparkSubmitCommandBuilder(List<String> args) {
    this.allowsMixedArguments = false;
    this.sparkArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;

    if (args.size() > 0) {
      // 判斷是saprkR-sell還是pyspark-shell
      switch (args.get(0)) {
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL:
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;
        
        // 不是pyspark或R,則是example
        case RUN_EXAMPLE:
          isExample = true;
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      OptionParser parser = new OptionParser();
      parser.parse(submitArgs);
      this.isAppResourceReq = parser.isAppResourceReq;
    }  else {
      this.isExample = isExample;
      this.isAppResourceReq = false;
    }
  }           

 OptionParser 繼承了 SparkSubmitOptionParser,parse方法是直接調用了SparkSubmitOptionParser的,就是對後面的設定的參數 --name vlue 進行過濾,過濾不符合規則的。

private class OptionParser extends SparkSubmitOptionParser {}           

parse()

// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
  protected final void parse(List<String> args) {
    // 判斷參數,比對以--開頭的所有參數,可以是--name="Spark shell"的設定
    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

    // 循環周遊送出的參數
    int idx = 0;
    for (idx = 0; idx < args.size(); idx++) {
      String arg = args.get(idx);
      String value = null;

      // 取出配置項和參數值
      // 如 name為--class,value為"Spark shell"
      Matcher m = eqSeparatedOpt.matcher(arg);
      if (m.matches()) {
        arg = m.group(1);
        value = m.group(2);
      }

      // Look for options with a value.
      // 過濾值為空的
      // 這裡會将參數name與opts常量中的類型進行比對
      String name = findCliOption(arg, opts);
      if (name != null) {
        if (value == null) {
          // 如參數隻有--class value,size=2,idx=0,size-1=1,說明值不為空
          // 如果沒有值,參數長度size=1,size-1=0,報錯
          if (idx == args.size() - 1) {
            throw new IllegalArgumentException(
                String.format("Missing argument for option '%s'.", arg));
          }

          // 值不為空,就把idx下一個值賦給value
          idx++;
          value = args.get(idx);
        }

        // 調用回調方法handle(),将使用者送出參數與opts[]中的設定的常量進行比對,方法在子類中重寫
        // 作用就是判斷是否為空,空的就直接退出判斷,繼續下一個參數讀取
        if (!handle(name, value)) {
          break;
        }
        continue;
      }

      // Look for a switch.
      // 再次過濾參是否含有--help等參數的
      // 将name與switches常量進行比對
      name = findCliOption(arg, switches);
      if (name != null) {
        if (!handle(name, null)) {
          break;
        }
        continue;
      }

      // 是否含有未知參數
      if (!handleUnknown(arg)) {
        break;
      }
    }

    if (idx < args.size()) {
      idx++;
    }

    // 多出的參數放到一個集合中,會報出異常
    handleExtraArgs(args.subList(idx, args.size()));
  }           

這裡建構指令完成,會傳回給spark-class最終調用 exec "${CMD[@]}" 執行。

其實到這裡,也就是spark-shell及spark-submit的啟動流程。

繼續閱讀