天天看點

3. Spark源碼解析之Master啟動流程解析

啟動Spark的Master,不管是執行start-all.sh還是直接在master節點直接啟動start-master.sh,都會進入spark-master.sh開始。這裡隻是對Master的啟動流程進行追蹤,腳本的具體解析可以參照前篇,具體的Master執行個體化後面再解讀。

腳本解析:Spark源碼解析之啟動腳本解析

首先進入spark-master.sh

start-master.sh

這個腳本最終調用了spark-daemon.sh。因為沒有輸入參數,是以$ORIGINAL_ARGS為空。最後傳入參數:

start org.apache.spark.deploy.master.Master 1 --host hostname --port 7077 --webui-port 8080

# 啟動spark-daemon腳本,參數為:start、$CLASS、1、host、port、webUI-port、$ORIGINAL_ARGS
# 直譯為:
# sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --host cdh-master --port 7077
# --webui-port 8080
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \    
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS
           

spark-daemon.sh

腳本最終執行了以下方法,execute_command方法是決定在前台還是背景運作,也就是調用了spark-class:

nice -n 0 /bin/spark-class org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080

case "$mode" in
  
    # nice -n 指定優先級執行,$SPARK_NICENESS是前面指定的優先級0
    # bash 執行後面的腳本
    # 如果參數是class,即start
    # 直譯為:
    # execute_commamd nice -n 0 bin/spark-class org.apache.spark.master.Master --host cdh-master
    # --port 7077 --webui-port 8080
    (class)	
      execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "[email protected]"
      ;;
 
    # 直譯為:
    # execute_commamd nice -n 0 bin/spark-submit org.apache.spark.master.Master --host cdh-master
    # --port 7077 --webui-port 8080
    (submit)
      execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "[email protected]"
      ;;
 
    (*)	# 其他報錯退出
      echo "unknown mode: $mode"
      exit 1
      ;;
  esac
           

spark-class

這裡調用了org.apache.spark.launcher.Main:

java -Xmx128m -cp ...\jars\* org.apache.spark.launcher.Main org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080

# 調用執行檔案目錄下的org.apache.spark.launcher.Main方法
# 傳入執行類及參數,解析後傳回執行指令
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]"
  printf "%d\0" $?        # 執行完成,添加一個成功的狀态碼0,%d十進制,\0就是0
}
           

org.apache.spark.launcher.Main

這個類是提供spark内部腳本調用的工具類,通過調用其他類建構執行指令,并不是執行入口。

最後根據送出的類型spark-submit或spark-class,建構對應的指令解析對象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通過buildCommand方法構造執行指令。

因為是啟動master,是以會進入spark-class,也就是解析對象SparkClassCommandBuilder中。

/**
   * main這個類主要是解析參數,把需要的參數放到執行對象中
   * spark-class傳入的參數:
   * org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080
   */
  public static void main(String[] argsArray) throws Exception {

    //判斷參數清單
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

  /** 
   * 将參數清單放入args集合中
   * 移出第一個參數指派給classname,即執行類:org.apache.spark.deploy.master.Master
   * 剩餘參數為:--host cdh-master --port 7077 --webui-port 8080
   */
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);

    //判斷是否前台列印執行資訊
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;
    
    //建構執行程式對象:spark-submit/spark-class
    //把參數都取出并解析,放到不同執行程式的對象中
    //意思是,執行程式在這裡拆分,取到自己需要的執行參數
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        // 建構spark-submit指令和對應的參數對象
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
      // 建構指令對象,spark-class指令和對應的參數
      builder = new SparkClassCommandBuilder(className, args);
    }

    // 建構要執行的指令
    Map<String, String> env = new HashMap<>();
    List<String> cmd = builder.buildCommand(env);
    if (printLaunchCommand) {
      System.err.println("Spark Command: " + join(" ", cmd));
      System.err.println("========================================");
    }
  }
}
           

SparkClassCommandBuilder

這是建構spark-class執行指令對象,将送出的class類解析成對象,放入classname和對應的參數。完成指令對象的建構後,最後才将對象解析成真正的執行指令。

我們啟動master就是進入了case "org.apache.spark.deploy.master.Master"

// 建構class指令對象,将class和需要的參數放入對象中
class SparkClassCommandBuilder extends AbstractCommandBuilder {

  // 這裡:classname=org.apache.spark.deploy.master.Master
  // classArgs=--host --port 7077 --webui-port 8080
  private final String className;
  private final List<String> classArgs;

  SparkClassCommandBuilder(String className, List<String> classArgs) {
    this.className = className;
    this.classArgs = classArgs;
  }

  @Override
  // 這是建構執行指令的方法
  public List<String> buildCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    List<String> javaOptsKeys = new ArrayList<>();
    String memKey = null;
    String extraClassPath = null;

    // 送出的以下class會由這個類解析為對象
    // Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use
    // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
    switch (className) {
      case "org.apache.spark.deploy.master.Master":
        // java類: ../bin/java
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        // spark類和參數: org.apache.spark.deploy.master.Master --host --port 7077 --webui-port 8080
        javaOptsKeys.add("SPARK_MASTER_OPTS");
        // 依賴jar包: ../jars/*
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        // 執行記憶體
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.worker.Worker":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_WORKER_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.history.HistoryServer":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_HISTORY_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
        javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
        memKey = "SPARK_EXECUTOR_MEMORY";
        extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
        break;
      case "org.apache.spark.executor.MesosExecutorBackend":
        javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
        memKey = "SPARK_EXECUTOR_MEMORY";
        extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
        break;
      case "org.apache.spark.deploy.mesos.MesosClusterDispatcher":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.ExternalShuffleService":
      case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      default:
        // 會預設設定為driver記憶體,預設1G
        memKey = "SPARK_DRIVER_MEMORY";
        break;
    }

    // 這裡建構了Java執行指令,也就是: .../java -cp
    List<String> cmd = buildJavaCommand(extraClassPath);

    // 判斷是否設定執行記憶體
    for (String key : javaOptsKeys) {
      String envValue = System.getenv(key);
      if (!isEmpty(envValue) && envValue.contains("Xmx")) {
        String msg = String.format("%s is not allowed to specify max heap(Xmx) memory settings " +
                "(was %s). Use the corresponding configuration instead.", key, envValue);
        throw new IllegalArgumentException(msg);
      }
      addOptionString(cmd, envValue);
    }

    // 如果driver記憶體沒有設定,這裡再設定為前面設定的128m,添加到執行指令中: -Xmx1G
    // 最終在這裡把所有配置和參數加入到執行指令中,完成了執行指令的建構
    String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
    cmd.add("-Xmx" + mem);
    cmd.add(className);
    cmd.addAll(classArgs);

    // 最後将建構的指令傳回給spark-class執行
    return cmd;
  }
}
           

buildJavaCommand

上面的建構java執行指令的方法,就是加載java環境變量和配置參數,生成java -cp指令

//建構java執行指令: java -cp 
  List<String> buildJavaCommand(String extraClassPath) throws IOException {
    List<String> cmd = new ArrayList<>();
    String envJavaHome;

    // 加載java環境變量
    if (javaHome != null) {
      cmd.add(join(File.separator, javaHome, "bin", "java"));
    } else if ((envJavaHome = System.getenv("JAVA_HOME")) != null) {
        cmd.add(join(File.separator, envJavaHome, "bin", "java"));
    } else {
        cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
    }

    // 加載java配置參數
    // Load extra JAVA_OPTS from conf/java-opts, if it exists.
    File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
    if (javaOpts.isFile()) {
      try (BufferedReader br = new BufferedReader(new InputStreamReader(
          new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
        String line;
        while ((line = br.readLine()) != null) {
          addOptionString(cmd, line);
        }
      }
    }

    // 完成java設定後,添加-cp
    cmd.add("-cp");
    cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
    return cmd;
  }
           

spark-class

傳回spark-class腳本繼續往下執行,收到了傳回的執行指令,再進行了一些處理,到最後 exec "${CMD[@]}" 才是真正執行了指令啟動master,至此master啟動完成。

# 關閉posix,posix不允許程序替換,并定義一個數組CMD
set +o posix
CMD=()
 
# 調用build_command方法,就是調用org.apache.spark.launcher.Main
# 傳入執行類,解析後得到參數,使用read指令通過while循環讀取放到CMD數組中
while IFS= read -d '' -r ARG; do    #讀取ARG,-d界定符,-r禁止反斜線轉義
  CMD+=("$ARG")
done < <(build_command "[email protected]")
 
# ${#CMD[@]}是CMD數組元素個數,shell的${arr[@]}所有元素,${#arr[@]}數組長度
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))     # 同last = arr.len-1,最後一個元素索引,就是添加的退出碼
LAUNCHER_EXIT_CODE=${CMD[$LAST]}    
 
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
 
# 檢查退出碼是否為整數,不是則列印除退出碼以外的參數,以錯誤碼退出
# ^比對開始,[0-9]範圍,+出現1次以上,$比對結束
# head -n-1 顯示到倒數第一行
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi
 
# 如果退出碼不為0,将其作為退出碼退出
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi
 
# 執行CMD中的java -cp指令
# exec不會另外啟動程序,這裡這個腳本已經執行完,是以替換掉也不會有影響
# ${CMD[@]:0:$LAST} 去除索引為0的,取1到最後一個元素
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
           

後面解讀Master到底怎麼初始化的。

Master執行個體化流程解析

繼續閱讀