天天看点

3. Spark源码解析之Master启动流程解析

启动Spark的Master,不管是执行start-all.sh还是直接在master节点直接启动start-master.sh,都会进入spark-master.sh开始。这里只是对Master的启动流程进行追踪,脚本的具体解析可以参照前篇,具体的Master实例化后面再解读。

脚本解析:Spark源码解析之启动脚本解析

首先进入spark-master.sh

start-master.sh

这个脚本最终调用了spark-daemon.sh。因为没有输入参数,所以$ORIGINAL_ARGS为空。最后传入参数:

start org.apache.spark.deploy.master.Master 1 --host hostname --port 7077 --webui-port 8080

# 启动spark-daemon脚本,参数为:start、$CLASS、1、host、port、webUI-port、$ORIGINAL_ARGS
# 直译为:
# sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --host cdh-master --port 7077
# --webui-port 8080
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \    
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS
           

spark-daemon.sh

脚本最终执行了以下方法,execute_command方法是决定在前台还是后台运行,也就是调用了spark-class:

nice -n 0 /bin/spark-class org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080

case "$mode" in
  
    # nice -n 指定优先级执行,$SPARK_NICENESS是前面指定的优先级0
    # bash 执行后面的脚本
    # 如果参数是class,即start
    # 直译为:
    # execute_commamd nice -n 0 bin/spark-class org.apache.spark.master.Master --host cdh-master
    # --port 7077 --webui-port 8080
    (class)	
      execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "[email protected]"
      ;;
 
    # 直译为:
    # execute_commamd nice -n 0 bin/spark-submit org.apache.spark.master.Master --host cdh-master
    # --port 7077 --webui-port 8080
    (submit)
      execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "[email protected]"
      ;;
 
    (*)	# 其他报错退出
      echo "unknown mode: $mode"
      exit 1
      ;;
  esac
           

spark-class

这里调用了org.apache.spark.launcher.Main:

java -Xmx128m -cp ...\jars\* org.apache.spark.launcher.Main org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080

# 调用执行文件目录下的org.apache.spark.launcher.Main方法
# 传入执行类及参数,解析后返回执行命令
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]"
  printf "%d\0" $?        # 执行完成,添加一个成功的状态码0,%d十进制,\0就是0
}
           

org.apache.spark.launcher.Main

这个类是提供spark内部脚本调用的工具类,通过调用其他类构建执行命令,并不是执行入口。

最后根据提交的类型spark-submit或spark-class,构建对应的命令解析对象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通过buildCommand方法构造执行命令。

因为是启动master,所以会进入spark-class,也就是解析对象SparkClassCommandBuilder中。

/**
   * main这个类主要是解析参数,把需要的参数放到执行对象中
   * spark-class传入的参数:
   * org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080
   */
  public static void main(String[] argsArray) throws Exception {

    //判断参数列表
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

  /** 
   * 将参数列表放入args集合中
   * 移出第一个参数赋值给classname,即执行类:org.apache.spark.deploy.master.Master
   * 剩余参数为:--host cdh-master --port 7077 --webui-port 8080
   */
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);

    //判断是否前台打印执行信息
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;
    
    //构建执行程序对象:spark-submit/spark-class
    //把参数都取出并解析,放到不同执行程序的对象中
    //意思是,执行程序在这里拆分,取到自己需要的执行参数
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        // 构建spark-submit命令和对应的参数对象
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
      // 构建命令对象,spark-class命令和对应的参数
      builder = new SparkClassCommandBuilder(className, args);
    }

    // 构建要执行的命令
    Map<String, String> env = new HashMap<>();
    List<String> cmd = builder.buildCommand(env);
    if (printLaunchCommand) {
      System.err.println("Spark Command: " + join(" ", cmd));
      System.err.println("========================================");
    }
  }
}
           

SparkClassCommandBuilder

这是构建spark-class执行命令对象,将提交的class类解析成对象,放入classname和对应的参数。完成命令对象的构建后,最后才将对象解析成真正的执行命令。

我们启动master就是进入了case "org.apache.spark.deploy.master.Master"

// 构建class命令对象,将class和需要的参数放入对象中
class SparkClassCommandBuilder extends AbstractCommandBuilder {

  // 这里:classname=org.apache.spark.deploy.master.Master
  // classArgs=--host --port 7077 --webui-port 8080
  private final String className;
  private final List<String> classArgs;

  SparkClassCommandBuilder(String className, List<String> classArgs) {
    this.className = className;
    this.classArgs = classArgs;
  }

  @Override
  // 这是构建执行命令的方法
  public List<String> buildCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    List<String> javaOptsKeys = new ArrayList<>();
    String memKey = null;
    String extraClassPath = null;

    // 提交的以下class会由这个类解析为对象
    // Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use
    // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
    switch (className) {
      case "org.apache.spark.deploy.master.Master":
        // java类: ../bin/java
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        // spark类和参数: org.apache.spark.deploy.master.Master --host --port 7077 --webui-port 8080
        javaOptsKeys.add("SPARK_MASTER_OPTS");
        // 依赖jar包: ../jars/*
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        // 执行内存
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.worker.Worker":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_WORKER_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.history.HistoryServer":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_HISTORY_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
        javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
        memKey = "SPARK_EXECUTOR_MEMORY";
        extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
        break;
      case "org.apache.spark.executor.MesosExecutorBackend":
        javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
        memKey = "SPARK_EXECUTOR_MEMORY";
        extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
        break;
      case "org.apache.spark.deploy.mesos.MesosClusterDispatcher":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      case "org.apache.spark.deploy.ExternalShuffleService":
      case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
        javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
        javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
        extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
        memKey = "SPARK_DAEMON_MEMORY";
        break;
      default:
        // 会默认设置为driver内存,默认1G
        memKey = "SPARK_DRIVER_MEMORY";
        break;
    }

    // 这里构建了Java执行命令,也就是: .../java -cp
    List<String> cmd = buildJavaCommand(extraClassPath);

    // 判断是否设置执行内存
    for (String key : javaOptsKeys) {
      String envValue = System.getenv(key);
      if (!isEmpty(envValue) && envValue.contains("Xmx")) {
        String msg = String.format("%s is not allowed to specify max heap(Xmx) memory settings " +
                "(was %s). Use the corresponding configuration instead.", key, envValue);
        throw new IllegalArgumentException(msg);
      }
      addOptionString(cmd, envValue);
    }

    // 如果driver内存没有设置,这里再设置为前面设置的128m,添加到执行命令中: -Xmx1G
    // 最终在这里把所有配置和参数加入到执行命令中,完成了执行命令的构建
    String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
    cmd.add("-Xmx" + mem);
    cmd.add(className);
    cmd.addAll(classArgs);

    // 最后将构建的命令返回给spark-class执行
    return cmd;
  }
}
           

buildJavaCommand

上面的构建java执行命令的方法,就是加载java环境变量和配置参数,生成java -cp命令

//构建java执行命令: java -cp 
  List<String> buildJavaCommand(String extraClassPath) throws IOException {
    List<String> cmd = new ArrayList<>();
    String envJavaHome;

    // 加载java环境变量
    if (javaHome != null) {
      cmd.add(join(File.separator, javaHome, "bin", "java"));
    } else if ((envJavaHome = System.getenv("JAVA_HOME")) != null) {
        cmd.add(join(File.separator, envJavaHome, "bin", "java"));
    } else {
        cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
    }

    // 加载java配置参数
    // Load extra JAVA_OPTS from conf/java-opts, if it exists.
    File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
    if (javaOpts.isFile()) {
      try (BufferedReader br = new BufferedReader(new InputStreamReader(
          new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
        String line;
        while ((line = br.readLine()) != null) {
          addOptionString(cmd, line);
        }
      }
    }

    // 完成java设置后,添加-cp
    cmd.add("-cp");
    cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
    return cmd;
  }
           

spark-class

返回spark-class脚本继续往下执行,收到了返回的执行命令,再进行了一些处理,到最后 exec "${CMD[@]}" 才是真正执行了命令启动master,至此master启动完成。

# 关闭posix,posix不允许进程替换,并定义一个数组CMD
set +o posix
CMD=()
 
# 调用build_command方法,就是调用org.apache.spark.launcher.Main
# 传入执行类,解析后得到参数,使用read命令通过while循环读取放到CMD数组中
while IFS= read -d '' -r ARG; do    #读取ARG,-d界定符,-r禁止反斜线转义
  CMD+=("$ARG")
done < <(build_command "[email protected]")
 
# ${#CMD[@]}是CMD数组元素个数,shell的${arr[@]}所有元素,${#arr[@]}数组长度
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))     # 同last = arr.len-1,最后一个元素索引,就是添加的退出码
LAUNCHER_EXIT_CODE=${CMD[$LAST]}    
 
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
 
# 检查退出码是否为整数,不是则打印除退出码以外的参数,以错误码退出
# ^匹配开始,[0-9]范围,+出现1次以上,$匹配结束
# head -n-1 显示到倒数第一行
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi
 
# 如果退出码不为0,将其作为退出码退出
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi
 
# 执行CMD中的java -cp命令
# exec不会另外启动进程,这里这个脚本已经执行完,所以替换掉也不会有影响
# ${CMD[@]:0:$LAST} 去除索引为0的,取1到最后一个元素
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
           

后面解读Master到底怎么初始化的。

Master实例化流程解析

继续阅读