不管是启动spark-shell,或者通过spark-submit提交jar,还是其他其他master或者worker的脚本,最后都会进入spark-class,并调用launch.main方法构建执行命令。
java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"
也就是说org.apache.spark.launcher.Main是被spark-class调用,从spark-class接收参数。这个类是提供spark内部脚本调用的工具类,并不是真正的执行入口。它负责调用其他类,对参数进行解析,并生成执行命令,最后将命令返回给spark-class的 exec “${CMD[@]}”执行。
它主要是根据提交的类型spark-submit和spark-class(master、worker、hostoryserver等等),构建对应的命令解析对象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通过buildCommand方法构造执行命令。
大概看一下这时sparksubmit的参数,Master和Worker后续解析:
方式 | 参数 |
spark-shell | org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" |
spark-submit | --class com.idmapping.scala.WordCount \ --master yarn \ --deploy-mode client \ --driver-memory 4G \ --executor-memory 3G \ --executor-cores 2 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.default.parallelism=24 \ /user/jars/idmapping-job-1.0-SNAPSHOT.jar \ file:///user/tmp/words.txt file:///user/data/wordcount/ |
/**
* Command line interface for the Spark launcher. Used internally by Spark scripts.
*/
//这是提供spark内部脚本使用工具类
class Main {
/**
* Usage: Main [class] [class args]
* <p>
* 分为spark-submit和spark-class两种模式
* 但提交的是class类的话,会包含其他如:master/worker/history等等
* This CLI works in two different modes:
* <ul>
* <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
* {@link SparkLauncher} class is used to launch a Spark application.</li>
* <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
* </ul>
*
* 类unix系统的输出的参数是集合,而windows参数是空格分隔
* This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
* "bin/spark-class2.cmd" batch script on Windows to execute the final command.
* <p>
* On Unix-like systems, the output is a list of command arguments, separated by the NULL
* character. On Windows, the output is a command line suitable for direct execution from the
* script.
*/
/**
* main这个类主要是解析参数,把需要的参数放到执行对象中
* 如果是直接启动spark-shell调用spark-class传入的参数:
* org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell"
* --master spark://host:7077
*/
public static void main(String[] argsArray) throws Exception {
//判断参数列表
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
/**
* 将参数列表放入args集合中
* 移出第一个参数赋值给classname,即执行程序。剩余参数为:
* --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
*/
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
// 判断是否打印执行信息
// 创建命令解析器
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
// 构建执行程序对象:spark-submit/spark-class
// 把参数都取出并解析,放入执行程序对象中
// 意思是,submit还是master和worker等程序在这里拆分,并获取对应的执行参数
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
// submit的判断及错误提示,帮助信息
try {
// 构建spark-submit命令对象
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
System.err.println();
// 类名解析--class org.apache.spark.repl.Main
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
// Ignore parsing exceptions.
}
// 帮助信息
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
// 构建spark-submit帮助信息对象
builder = new SparkSubmitCommandBuilder(help);
}
} else {
// 构建spark-class命令对象
// 主要是在这个类里解析了命令对象和参数
builder = new SparkClassCommandBuilder(className, args);
}
// 这里才真正构建了执行命令
// 调用了SparkClassCommandBuilder的buildCommand方法
// 把执行参数解析成了k/v格式
Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
// 是windows环境的话,不解析成k/v,而是用空格分隔
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
// In bash, use NULL as the arg separator since it cannot be used in an argument.
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
}
// 以下是windows下的设置
// 主要是在linux下使用,windows下就不怎么去关注了
// 在这里构建windows下的执行命令
private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
StringBuilder cmdline = new StringBuilder();
for (Map.Entry<String, String> e : childEnv.entrySet()) {
cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
cmdline.append(" && ");
}
for (String arg : cmd) {
cmdline.append(quoteForBatchScript(arg));
cmdline.append(" ");
}
return cmdline.toString();
}
/**
* Prepare the command for execution from a bash script. The final command will have commands to
* set up any needed environment variables needed by the child process.
*/
// 为windows下运行bash命令准备必须的环境变量
private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
if (childEnv.isEmpty()) {
return cmd;
}
List<String> newCmd = new ArrayList<>();
newCmd.add("env");
for (Map.Entry<String, String> e : childEnv.entrySet()) {
newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
}
newCmd.addAll(cmd);
return newCmd;
}
// 当spark-submit提交失败时,这里会再进行一次解析,再不行才会提示用法
private static class MainClassOptionParser extends SparkSubmitOptionParser {
String className;
@Override
protected boolean handle(String opt, String value) {
if (CLASS.equals(opt)) {
className = value;
}
return false;
}
@Override
protected boolean handleUnknown(String opt) {
return false;
}
@Override
protected void handleExtraArgs(List<String> extra) {
}
}
}
SparkSubmitCommandBuilder
// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
SparkSubmitCommandBuilder(List<String> args) {
this.allowsMixedArguments = false;
this.sparkArgs = new ArrayList<>();
boolean isExample = false;
List<String> submitArgs = args;
if (args.size() > 0) {
// 判断是saprkR-sell还是pyspark-shell
switch (args.get(0)) {
case PYSPARK_SHELL:
this.allowsMixedArguments = true;
appResource = PYSPARK_SHELL;
submitArgs = args.subList(1, args.size());
break;
case SPARKR_SHELL:
this.allowsMixedArguments = true;
appResource = SPARKR_SHELL;
submitArgs = args.subList(1, args.size());
break;
// 不是pyspark或R,则是example
case RUN_EXAMPLE:
isExample = true;
submitArgs = args.subList(1, args.size());
}
this.isExample = isExample;
OptionParser parser = new OptionParser();
parser.parse(submitArgs);
this.isAppResourceReq = parser.isAppResourceReq;
} else {
this.isExample = isExample;
this.isAppResourceReq = false;
}
}
OptionParser 继承了 SparkSubmitOptionParser,parse方法是直接调用了SparkSubmitOptionParser的,就是对后面的设置的参数 --name vlue 进行过滤,过滤不符合规则的。
private class OptionParser extends SparkSubmitOptionParser {}
parse()
// --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
protected final void parse(List<String> args) {
// 判断参数,匹配以--开头的所有参数,可以是--name="Spark shell"的设置
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
// 循环遍历提交的参数
int idx = 0;
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
// 取出配置项和参数值
// 如 name为--class,value为"Spark shell"
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
// 过滤值为空的
// 这里会将参数name与opts常量中的类型进行匹配
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
// 如参数只有--class value,size=2,idx=0,size-1=1,说明值不为空
// 如果没有值,参数长度size=1,size-1=0,报错
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
// 值不为空,就把idx下一个值赋给value
idx++;
value = args.get(idx);
}
// 调用回调方法handle(),将用户提交参数与opts[]中的设置的常量进行匹配,方法在子类中重写
// 作用就是判断是否为空,空的就直接退出判断,继续下一个参数读取
if (!handle(name, value)) {
break;
}
continue;
}
// Look for a switch.
// 再次过滤参是否含有--help等参数的
// 将name与switches常量进行匹配
name = findCliOption(arg, switches);
if (name != null) {
if (!handle(name, null)) {
break;
}
continue;
}
// 是否含有未知参数
if (!handleUnknown(arg)) {
break;
}
}
if (idx < args.size()) {
idx++;
}
// 多出的参数放到一个集合中,会报出异常
handleExtraArgs(args.subList(idx, args.size()));
}
这里构建命令完成,会返回给spark-class最终调用 exec "${CMD[@]}" 执行。
其实到这里,也就是spark-shell及spark-submit的启动流程。