啟動Spark的Master,不管是執行start-all.sh還是直接在master節點直接啟動start-master.sh,都會進入spark-master.sh開始。這裡隻是對Master的啟動流程進行追蹤,腳本的具體解析可以參照前篇,具體的Master執行個體化後面再解讀。
腳本解析:Spark源碼解析之啟動腳本解析
首先進入spark-master.sh
start-master.sh
這個腳本最終調用了spark-daemon.sh。因為沒有輸入參數,是以$ORIGINAL_ARGS為空。最後傳入參數:
start org.apache.spark.deploy.master.Master 1 --host hostname --port 7077 --webui-port 8080
# 啟動spark-daemon腳本,參數為:start、$CLASS、1、host、port、webUI-port、$ORIGINAL_ARGS
# 直譯為:
# sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --host cdh-master --port 7077
# --webui-port 8080
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
spark-daemon.sh
腳本最終執行了以下方法,execute_command方法是決定在前台還是背景運作,也就是調用了spark-class:
nice -n 0 /bin/spark-class org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080
case "$mode" in
# nice -n 指定優先級執行,$SPARK_NICENESS是前面指定的優先級0
# bash 執行後面的腳本
# 如果參數是class,即start
# 直譯為:
# execute_commamd nice -n 0 bin/spark-class org.apache.spark.master.Master --host cdh-master
# --port 7077 --webui-port 8080
(class)
execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "[email protected]"
;;
# 直譯為:
# execute_commamd nice -n 0 bin/spark-submit org.apache.spark.master.Master --host cdh-master
# --port 7077 --webui-port 8080
(submit)
execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "[email protected]"
;;
(*) # 其他報錯退出
echo "unknown mode: $mode"
exit 1
;;
esac
spark-class
這裡調用了org.apache.spark.launcher.Main:
java -Xmx128m -cp ...\jars\* org.apache.spark.launcher.Main org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080
# 調用執行檔案目錄下的org.apache.spark.launcher.Main方法
# 傳入執行類及參數,解析後傳回執行指令
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]"
printf "%d\0" $? # 執行完成,添加一個成功的狀态碼0,%d十進制,\0就是0
}
org.apache.spark.launcher.Main
這個類是提供spark内部腳本調用的工具類,通過調用其他類建構執行指令,并不是執行入口。
最後根據送出的類型spark-submit或spark-class,建構對應的指令解析對象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通過buildCommand方法構造執行指令。
因為是啟動master,是以會進入spark-class,也就是解析對象SparkClassCommandBuilder中。
/**
* main這個類主要是解析參數,把需要的參數放到執行對象中
* spark-class傳入的參數:
* org.apache.spark.deploy.master.Master --host cdh-master --port 7077 --webui-port 8080
*/
public static void main(String[] argsArray) throws Exception {
//判斷參數清單
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
/**
* 将參數清單放入args集合中
* 移出第一個參數指派給classname,即執行類:org.apache.spark.deploy.master.Master
* 剩餘參數為:--host cdh-master --port 7077 --webui-port 8080
*/
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
//判斷是否前台列印執行資訊
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
//建構執行程式對象:spark-submit/spark-class
//把參數都取出并解析,放到不同執行程式的對象中
//意思是,執行程式在這裡拆分,取到自己需要的執行參數
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
System.err.println();
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
// Ignore parsing exceptions.
}
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
// 建構spark-submit指令和對應的參數對象
builder = new SparkSubmitCommandBuilder(help);
}
} else {
// 建構指令對象,spark-class指令和對應的參數
builder = new SparkClassCommandBuilder(className, args);
}
// 建構要執行的指令
Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
}
}
SparkClassCommandBuilder
這是建構spark-class執行指令對象,将送出的class類解析成對象,放入classname和對應的參數。完成指令對象的建構後,最後才将對象解析成真正的執行指令。
我們啟動master就是進入了case "org.apache.spark.deploy.master.Master"
// 建構class指令對象,将class和需要的參數放入對象中
class SparkClassCommandBuilder extends AbstractCommandBuilder {
// 這裡:classname=org.apache.spark.deploy.master.Master
// classArgs=--host --port 7077 --webui-port 8080
private final String className;
private final List<String> classArgs;
SparkClassCommandBuilder(String className, List<String> classArgs) {
this.className = className;
this.classArgs = classArgs;
}
@Override
// 這是建構執行指令的方法
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
List<String> javaOptsKeys = new ArrayList<>();
String memKey = null;
String extraClassPath = null;
// 送出的以下class會由這個類解析為對象
// Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use
// SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
switch (className) {
case "org.apache.spark.deploy.master.Master":
// java類: ../bin/java
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
// spark類和參數: org.apache.spark.deploy.master.Master --host --port 7077 --webui-port 8080
javaOptsKeys.add("SPARK_MASTER_OPTS");
// 依賴jar包: ../jars/*
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
// 執行記憶體
memKey = "SPARK_DAEMON_MEMORY";
break;
case "org.apache.spark.deploy.worker.Worker":
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_WORKER_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
memKey = "SPARK_DAEMON_MEMORY";
break;
case "org.apache.spark.deploy.history.HistoryServer":
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_HISTORY_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
memKey = "SPARK_DAEMON_MEMORY";
break;
case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
break;
case "org.apache.spark.executor.MesosExecutorBackend":
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
break;
case "org.apache.spark.deploy.mesos.MesosClusterDispatcher":
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
memKey = "SPARK_DAEMON_MEMORY";
break;
case "org.apache.spark.deploy.ExternalShuffleService":
case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
memKey = "SPARK_DAEMON_MEMORY";
break;
default:
// 會預設設定為driver記憶體,預設1G
memKey = "SPARK_DRIVER_MEMORY";
break;
}
// 這裡建構了Java執行指令,也就是: .../java -cp
List<String> cmd = buildJavaCommand(extraClassPath);
// 判斷是否設定執行記憶體
for (String key : javaOptsKeys) {
String envValue = System.getenv(key);
if (!isEmpty(envValue) && envValue.contains("Xmx")) {
String msg = String.format("%s is not allowed to specify max heap(Xmx) memory settings " +
"(was %s). Use the corresponding configuration instead.", key, envValue);
throw new IllegalArgumentException(msg);
}
addOptionString(cmd, envValue);
}
// 如果driver記憶體沒有設定,這裡再設定為前面設定的128m,添加到執行指令中: -Xmx1G
// 最終在這裡把所有配置和參數加入到執行指令中,完成了執行指令的建構
String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
cmd.add("-Xmx" + mem);
cmd.add(className);
cmd.addAll(classArgs);
// 最後将建構的指令傳回給spark-class執行
return cmd;
}
}
buildJavaCommand
上面的建構java執行指令的方法,就是加載java環境變量和配置參數,生成java -cp指令
//建構java執行指令: java -cp
List<String> buildJavaCommand(String extraClassPath) throws IOException {
List<String> cmd = new ArrayList<>();
String envJavaHome;
// 加載java環境變量
if (javaHome != null) {
cmd.add(join(File.separator, javaHome, "bin", "java"));
} else if ((envJavaHome = System.getenv("JAVA_HOME")) != null) {
cmd.add(join(File.separator, envJavaHome, "bin", "java"));
} else {
cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
}
// 加載java配置參數
// Load extra JAVA_OPTS from conf/java-opts, if it exists.
File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
if (javaOpts.isFile()) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(
new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
addOptionString(cmd, line);
}
}
}
// 完成java設定後,添加-cp
cmd.add("-cp");
cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
return cmd;
}
spark-class
傳回spark-class腳本繼續往下執行,收到了傳回的執行指令,再進行了一些處理,到最後 exec "${CMD[@]}" 才是真正執行了指令啟動master,至此master啟動完成。
# 關閉posix,posix不允許程序替換,并定義一個數組CMD
set +o posix
CMD=()
# 調用build_command方法,就是調用org.apache.spark.launcher.Main
# 傳入執行類,解析後得到參數,使用read指令通過while循環讀取放到CMD數組中
while IFS= read -d '' -r ARG; do #讀取ARG,-d界定符,-r禁止反斜線轉義
CMD+=("$ARG")
done < <(build_command "[email protected]")
# ${#CMD[@]}是CMD數組元素個數,shell的${arr[@]}所有元素,${#arr[@]}數組長度
COUNT=${#CMD[@]}
LAST=$((COUNT - 1)) # 同last = arr.len-1,最後一個元素索引,就是添加的退出碼
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
# 檢查退出碼是否為整數,不是則列印除退出碼以外的參數,以錯誤碼退出
# ^比對開始,[0-9]範圍,+出現1次以上,$比對結束
# head -n-1 顯示到倒數第一行
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
# 如果退出碼不為0,将其作為退出碼退出
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
# 執行CMD中的java -cp指令
# exec不會另外啟動程序,這裡這個腳本已經執行完,是以替換掉也不會有影響
# ${CMD[@]:0:$LAST} 去除索引為0的,取1到最後一個元素
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
後面解讀Master到底怎麼初始化的。
Master執行個體化流程解析