天天看点

spark源码编译和集群部署以及idea中sbt开发环境集成

spark源码编译和集群部署以及idea中sbt开发环境集成

  • ​​源码下载​​
  • ​​源码编译​​
  • ​​maven 下载​​
  • ​​scala 下载​​
  • ​​编译参数​​
  • ​​编译​​
  • ​​编译分发的二进制包​​
  • ​​单机启动​​
  • ​​集群部署​​
  • ​​开发环境集成​​
  • ​​源码编译的3.2.0版本无法在window上直接用spark-shell启动​​
  • ​​总结​​

项目地址:https://gitee.com/jyq_18792721831/studyspark.git

源码下载

打开​​Apache Spark™ - Unified Engine for large-scale data analytics​​,下载源码

spark源码编译和集群部署以及idea中sbt开发环境集成

在下载界面中选择源码下载

spark源码编译和集群部署以及idea中sbt开发环境集成

随便选择哪个地址

spark源码编译和集群部署以及idea中sbt开发环境集成

下载源码后上传到hadoop01服务器的​

​/spark​

​目录下

spark源码编译和集群部署以及idea中sbt开发环境集成

使用​

​tar -zxvf spark-3.2.0.tgz​

​​解压到​

​/spark​

​目录下

spark源码编译和集群部署以及idea中sbt开发环境集成

源码编译

返回下载界面,选择文档,选择最新版或者你下载的版本

spark源码编译和集群部署以及idea中sbt开发环境集成

或者你可以直接打开​​Overview - Spark 3.2.0 Documentation (apache.org)​​界面

选择跳转到打包构建的文档

spark源码编译和集群部署以及idea中sbt开发环境集成

有两种构建方式:maven和sbt(scala build tool)

spark源码编译和集群部署以及idea中sbt开发环境集成

我们选择maven

maven 下载

在编译文档中,要求我们尽可能使用新的maven

spark源码编译和集群部署以及idea中sbt开发环境集成

所以我们使用手动安装的方式,安装最新版

首先打开​​Maven – Welcome to Apache Maven​​,跳转到下载页

spark源码编译和集群部署以及idea中sbt开发环境集成

我们选择最新的二进制包

spark源码编译和集群部署以及idea中sbt开发环境集成

右键拷贝链接,直接在hadoop01上下载

在根目录下创建​

​/maven​

​​目录,然后执行​

​curl -O https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz​

​下载

spark源码编译和集群部署以及idea中sbt开发环境集成

使用​

​tar -zxvf apache-maven-3.8.4-bin.tar.gz​

​解压

spark源码编译和集群部署以及idea中sbt开发环境集成

删除压缩包,并把maven里面的内容移动出来,使得maven的home为​

​/maven​

spark源码编译和集群部署以及idea中sbt开发环境集成

因为之前就在hadoop01上安装过了java,并且配置了环境变量,所以可以直接配置maven的环境变量即可,高版本需要jdk1.7以上

spark源码编译和集群部署以及idea中sbt开发环境集成

配置环境变量

spark源码编译和集群部署以及idea中sbt开发环境集成

然后使用​

​source ~/.bash_profile​

​生效

使用​

​mvn --version​

​验证版本

spark源码编译和集群部署以及idea中sbt开发环境集成

scala 下载

到​​The Scala Programming Language (scala-lang.org)​​进入下载界面

spark源码编译和集群部署以及idea中sbt开发环境集成

选择scala2吧,scala3实在没用过。

拉到最下面,下载二进制包

spark源码编译和集群部署以及idea中sbt开发环境集成

相同的方式,拷贝链接,在hadoop01中下载

spark源码编译和集群部署以及idea中sbt开发环境集成

然后配置环境变量

spark源码编译和集群部署以及idea中sbt开发环境集成

使用​

​source ~/.bash_profile​

​生效后验证版本

spark源码编译和集群部署以及idea中sbt开发环境集成

编译参数

根据文档,首先设置maven的信息

spark源码编译和集群部署以及idea中sbt开发环境集成

我们将​

​MAVEN_OPTS​

​​放入​

​~/.bash_profile​

​中

spark源码编译和集群部署以及idea中sbt开发环境集成

使用​

​source ~/.bash_profile​

​使之生效。

接着往下看构建文档,找到支持hive和jdbc的构建

spark源码编译和集群部署以及idea中sbt开发环境集成

这是maven命令,使用源码包中自带的maven,其中​

​-P​

​​和​

​-D​

​是一些参数。

这些参数和pom.xml文件中的profile和properties有关。

​-D​

​对应properties

​-P​

​对应profile

我们查看pom.xml文件

这只是一部分,具体的可以自行查看

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <!-- 我们使用的是java 11,这个需要进行替换 -->
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <!-- 因为我们使用的是3.8.1的maven,所以这个也需要替换 -->
    <maven.version>3.6.3</maven.version>
    <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
    <sbt.project.name>spark</sbt.project.name>
    <slf4j.version>1.7.30</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <!-- hadoop 的版本是2.9.2 -->
    <hadoop.version>3.3.1</hadoop.version>
    <protobuf.version>2.5.0</protobuf.version>
    <yarn.version>${hadoop.version}</yarn.version>
    <zookeeper.version>3.6.2</zookeeper.version>
    <curator.version>2.13.0</curator.version>
    <hive.group>org.apache.hive</hive.group>
    <hive.classifier>core</hive.classifier>
    <!-- Version used in Maven Hive dependency -->
    <!-- hive的版本是2.3.9,这个可以不用修改 -->
    <hive.version>2.3.9</hive.version>
    <hive23.version>2.3.9</hive23.version>
    <!-- Version used for internal directory structure -->
    <hive.version.short>2.3</hive.version.short>
    <!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
    <kafka.version>2.8.0</kafka.version>
    <!-- After 10.15.1.3, the minimum required version is JDK9 -->
    <derby.version>10.14.2.0</derby.version>
    <parquet.version>1.12.1</parquet.version>
    <orc.version>1.6.11</orc.version>
    <jetty.version>9.4.43.v20210629</jetty.version>
    <jakartaservlet.version>4.0.3</jakartaservlet.version>
    <chill.version>0.10.0</chill.version>
    <ivy.version>2.5.0</ivy.version>
    <oro.version>2.0.8</oro.version>
    <!--
    If you changes codahale.metrics.version, you also need to change
    the link to metrics.dropwizard.io in docs/monitoring.md.
    -->
    <codahale.metrics.version>4.2.0</codahale.metrics.version>
    <avro.version>1.10.2</avro.version>
    <aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
    <!-- Should be consistent with Kinesis client dependency -->
    <aws.java.sdk.version>1.11.655</aws.java.sdk.version>
    <!-- the producer is used in tests -->
    <aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
    <!--  org.apache.httpcomponents/httpclient-->
    <commons.httpclient.version>4.5.13</commons.httpclient.version>
</properties>      

所以,我们的构建命令中​

​-D​

​要设置的如下

​mvn -Djava.version=11 -Dmaven.version=3.8.1 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -DskipTests clean package​

接下来是​

​-P​

​参数

​-P​

​参数对应的是pom.xml中的profile属性

<profile>
      <id>hadoop-2.7</id>
      <properties>
        <hadoop.version>2.7.4</hadoop.version>
        <curator.version>2.7.1</curator.version>
        <commons-io.version>2.4</commons-io.version>
        <hadoop-client-api.artifact>hadoop-client</hadoop-client-api.artifact>
        <hadoop-client-runtime.artifact>hadoop-yarn-api</hadoop-client-runtime.artifact>
        <hadoop-client-minicluster.artifact>hadoop-client</hadoop-client-minicluster.artifact>
      </properties>
    </profile>
    <profile>
      <id>hive-2.3</id>
    </profile>
    <profile>
      <id>yarn</id>
      <modules>
        <module>resource-managers/yarn</module>
        <module>common/network-yarn</module>
      </modules>
    </profile>
    <profile>
      <id>hive-thriftserver</id>
      <modules>
        <module>sql/hive-thriftserver</module>
      </modules>
    </profile>
    <profile>
      <id>scala-2.13</id>
      <properties>
        <scala.version>2.13.5</scala.version>
        <scala.binary.version>2.13</scala.binary.version>
      </properties>
      <build>
        <pluginManagement>
          <plugins>
            <plugin>
              <groupId>net.alchim31.maven</groupId>
              <artifactId>scala-maven-plugin</artifactId>
              <configuration>
                <args>
                  <arg>-unchecked</arg>
                  <arg>-deprecation</arg>
                  <arg>-feature</arg>
                  <arg>-explaintypes</arg>
                  <arg>-target:jvm-1.8</arg>
                  <arg>-Wconf:cat=deprecation:wv,any:e</arg>
                  <arg>-Wconf:cat=scaladoc:wv</arg>
                  <arg>-Wconf:cat=lint-multiarg-infix:wv</arg>
                  <arg>-Wconf:cat=other-nullary-override:wv</arg>
                  <arg>-Wconf:cat=other-match-analysis&site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv</arg>
                  <arg>-Wconf:cat=other-pure-statement&site=org.apache.spark.streaming.util.FileBasedWriteAheadLog.readAll.readFile:wv</arg>
                  <arg>-Wconf:cat=other-pure-statement&site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite.<local OutputCommitCoordinatorSuite>.futureAction:wv</arg>
                  <arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s</arg>
                  <arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s</arg>
                  <arg>-Wconf:msg=Auto-application to \`\(\)\` is deprecated:s</arg>
                  <arg>-Wconf:msg=method with a single empty parameter list overrides method without any parameter list:s</arg>
                  <arg>-Wconf:msg=method without a parameter list overrides a method with a single empty one:s</arg>
                  <arg>-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e</arg>
                </args>
                <compilerPlugins combine.self="override">
                </compilerPlugins>
              </configuration>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </profile>      

所以​

​-P​

​的参数大概就这么多

​mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.13 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -Dscala.binary.version=2.13 -DskipTests clean package​

编译

如果使用的是scala-2.13版本,那么需要切换scala版本

spark源码编译和集群部署以及idea中sbt开发环境集成

查看​

​change-scala-version.sh​

​的内容,主要是环境变量的配置和pom.xml文件中的版本号的修改。

在​

​pom.xml​

​​中,scala相关的配置是properties,理论上我们使用​

​-D​

​参数就能修改,不过可能有其他的配置,不一定是pom.xml的,反正执行一次最好。

而且,需要注意的是,执行这个命令,必须在spark主目录下,因为在​

​change-scala-version.sh​

​脚本中使用相对路径执行其他脚本,所以如果你在其他位置执行,那么会因为上下文的问题,导致执行失败。

实际上,​

​change-scala-version.sh​

​​脚本还会使用spark主目录下​

​build/mvn​

​程序。

我们使用上面拼接好的命令,在​

​SPARK_HOME​

​下执行

第一次执行,maven 会下载相关的依赖,比较慢

spark源码编译和集群部署以及idea中sbt开发环境集成

哎,依赖是在是多,耐心等待吧

spark源码编译和集群部署以及idea中sbt开发环境集成

等待了一段时间,发现编译错误了

spark源码编译和集群部署以及idea中sbt开发环境集成

很正常,一次成功才不正常。

我们查看错误的堆栈,发现是因为找不到一个为​

​RangePartitioner​

​​的类,查看源码,这是一个在​

​apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala​

​中定义的类

spark源码编译和集群部署以及idea中sbt开发环境集成

问题在于scala-2.13中没有打包生成class文件,而scala-2.12中就会打包生成calss文件

spark源码编译和集群部署以及idea中sbt开发环境集成

生成的class文件存储在​

​/spark/core/target/scala-2.12/classes/org/apache/spark​

​目录下

不知道是不是scala和spark的兼容性问题。(我自己试了好多次,2.12.15可以编译成功,2.13.8就编译失败了。)

在使用scala-2.12编译的时候,编译到spark-sql的时候,卡主,怀疑是内存不足了

spark源码编译和集群部署以及idea中sbt开发环境集成

幸好是虚拟机,可以增加内存,我们尝试将内存增加到4G

然后终止编译,重新编译,并使用​

​-X​

​启动debug输出

经过尝试,调大内存在一定程度上能加快编译速度。

spark源码编译和集群部署以及idea中sbt开发环境集成

第一次编译可以采用一步一步执行,可以先编译后打包,其实打包也包含编译,我单独执行编译是为了验证scala-2.12是否会生成相关的class文件,打包成功如下

spark源码编译和集群部署以及idea中sbt开发环境集成

编译成功就会生成二进制文件,存储于spark根目录的bin目录下

spark源码编译和集群部署以及idea中sbt开发环境集成

其实这只是一部分,这些脚本调用的jar包也会生成。

我们在源码的bin目录下就可以开心的使用spark了

直接启动spark-shell

spark源码编译和集群部署以及idea中sbt开发环境集成

这些版本信息和我们在编译的时候指定的一模一样。

编译分发的二进制包

编译好只是这个环境编译好了,里面既有源码文件,也有可执行文件,我们不能在集群环境的每个节点上都进行编译一次,那么太过费时,而且也不可行。

所以我们需要编译可分发的二进制包,就像官网提供的spark一样,没有源码,全部是执行文件。

在官网中是建议我们使用工具编译分发的二进制包

spark源码编译和集群部署以及idea中sbt开发环境集成

查看这个脚本,发现还是maven命令

spark源码编译和集群部署以及idea中sbt开发环境集成

最后将编译后在bin目录中的内容进行打包而已

spark源码编译和集群部署以及idea中sbt开发环境集成

我们在spark的根目录下执行​

​./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package​

打包完成后就会生成​

​psark-2.9.2-bin-2.12.tgz​

​压缩包。

spark源码编译和集群部署以及idea中sbt开发环境集成

并不会打印maven的日志,等待就行了,可能需要和maven打包同样长的时间。(2核,8G,大约40分钟)

完成后会在spark根目录下生成可分发二进制文件包

spark源码编译和集群部署以及idea中sbt开发环境集成

实际上当我们使用maven命令完成打包后,只是单独的想打包分发二进制包,那么是完全不需要重新执行maven打包的,毕竟maven打包太慢了。我们可以将spark根目录下的​

​./dev/make-distribution.sh​

​文件中的相关maven的命令跳过,注释掉脚本中的这两行即可。

spark源码编译和集群部署以及idea中sbt开发环境集成

然后把原来脚本中使用maven获取版本号的方式自定义指定即可

spark源码编译和集群部署以及idea中sbt开发环境集成

这样就不会重复执行maven命令了(maven命令太费时间了)

然后在maven打包完成的前提下,执行分发的二进制包的命令​

​./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package​

spark源码编译和集群部署以及idea中sbt开发环境集成

spark源码编译实际上就是maven编译,如果你了解maven编译,那么在spark源码编译中遇到的问题,你都能很好的解决。

而二进制分发包的打包脚本,就是对maven编译的使用,然后在把编译后的文件重新组合。

单机启动

我们将编译好的二进制包,从hadoop01分发到hadoop02上,使用​

​scp spark-3.2.0-bin-2.12.tgz hadoop02:/spark/​

spark源码编译和集群部署以及idea中sbt开发环境集成

然后在hadoop02上解压,​

​tar -zxvf spark-3.2.0-bin-2.12.tgz​

我们选择在hadoop02上验证二进制分发包是否可用。

spark源码编译和集群部署以及idea中sbt开发环境集成

然后将​

​SPARK_HOME​

​设置到环境变量,并使之生效

spark源码编译和集群部署以及idea中sbt开发环境集成

然后你可以在任意位置使用​

​spark-shell​

​启动

相关的文档在这里​​Quick Start - Spark 3.2.0 Documentation (apache.org)​​

spark源码编译和集群部署以及idea中sbt开发环境集成
spark源码编译和集群部署以及idea中sbt开发环境集成

启动后如下

spark源码编译和集群部署以及idea中sbt开发环境集成

可见启动spark并不需要scala环境,应该把scala的相关的包集成进去了

我们尝试一下quick-started中的例子

spark源码编译和集群部署以及idea中sbt开发环境集成

然后打开​

​http://hadoop02:4040​

​查看执行历史

spark源码编译和集群部署以及idea中sbt开发环境集成

这种启动方式,在启动中会打印如下数据

spark源码编译和集群部署以及idea中sbt开发环境集成

这其实是spark的standalone的启动方式

相关的文档在这​​Overview - Spark 3.2.0 Documentation (apache.org)​​

spark源码编译和集群部署以及idea中sbt开发环境集成

关于standalone的模式的相关文档

spark源码编译和集群部署以及idea中sbt开发环境集成

​​Spark Standalone Mode - Spark 3.2.0 Documentation (apache.org)​​

集群部署

为什么直接启动​

​spark-shell​

​就是单机版的模式呢?

spark的配置和hadoop差不多,有个slaves文件,里面会指定从节点的域名

默认的配置在​

​/spark/conf​

​文件里面

spark源码编译和集群部署以及idea中sbt开发环境集成

但是你在这里面是找不到slaves文件的,不过有一个workers.template文件

查看这个文件,默认是localhost

spark源码编译和集群部署以及idea中sbt开发环境集成

在sbin目录下有一个​

​slaves.sh​

​文件

spark源码编译和集群部署以及idea中sbt开发环境集成

​slaves.sh​

​​文件跳转到了​

​workers.sh​

​文件中了

spark源码编译和集群部署以及idea中sbt开发环境集成

通过这个脚本基本上就能明白了,spark的从节点配置在​

​/sbin/spark-config.sh​

​​和​

​/conf​

​中都可以配置

其中所有的配置都可以在​

​/conf/spark-env.sh​

​中配置

这些是一些standalone的配置

spark源码编译和集群部署以及idea中sbt开发环境集成

基本上都是通用的。

我们搭建一个spark的集群,集群信息如下

节点 角色 其他
hadoop01 主节点 一个执行器
hadoop02 从节点 一个执行器
hadoop03 从节点 一个执行器

完整的​

​/conf/spark-env.sh​

​如下

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program

# hadoop 的配置信息
HADOOP_CONF_DIR=/hadoop/etc/hadoop

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# spark shuffle的数据目录,
SPARK_LOCAL_DIRS=/spark/shuffle_data

# Options read in YARN client/cluster mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)

# yarn的配置文件在hadoop的配置文件中
YARN_CONF_DIR=$HADOOP_CONF_DIR
# 每个节点启动几个执行器
SPARK_EXECUTOR_CORES=1
# 每个执行器可以使用多大内存
SPARK_EXECUTOR_MEMORY=1800M
# 每个driver可以使用多大内存
SPARK_DRIVER_MEMORY=1800M

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

# 主节点
SPARK_MASTER_HOST=hadoop01
# 端口,不要和worker的端口重复,通信端口
SPARK_MASTER_PORT=4040
# 界面端口
SPARK_MASTER_WEBUI_PORT=8089
# 每个从节点启动的执行器
SPARK_WORKER_CORES=1
# 每个从节点可使用最大内存
SPARK_WORKER_MEMORY=1800M
# 从节点的通信端口
SPARK_WORKER_PORT=4040
# 界面端口
SPARK_WORKER_WEBUI_PORT=8089
# 从节点工作目录
SPARK_WORKER_DIR=/spark/worker

# Options for launcher
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")

# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
# - SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.
# - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE  Run the proposed command in the foreground. It will not output a PID file.
# Options for native BLAS, like Intel MKL, OpenBLAS, and so on.
# You might get better performance to enable these options if using native BLAS (see SPARK-21305).
# - MKL_NUM_THREADS=1        Disable multi-threading of Intel MKL
# - OPENBLAS_NUM_THREADS=1   Disable multi-threading of OpenBLAS

# spark 的日志目录
SPARK_LOG_DIR=/spark/logs      

相关的配置项说明在这​​Spark Standalone Mode - Spark 3.2.0 Documentation (apache.org)​​

spark源码编译和集群部署以及idea中sbt开发环境集成

然后配置workers

spark源码编译和集群部署以及idea中sbt开发环境集成

接着把这个配置好的文件分发到hadoop02的​

​/spark目录下​

使用​

​scp -r /spark hadoop02:/spark/​

spark源码编译和集群部署以及idea中sbt开发环境集成

然后在hadoop01上启动(当然如果你在hadoop01上配置那么就可以不需要从hadoop02分发到hadoop01上了)

别忘记​

​SPARK_HOME​

spark源码编译和集群部署以及idea中sbt开发环境集成

接着切换到​

​/spark/sbin​

​目录下,执行启动脚本

spark源码编译和集群部署以及idea中sbt开发环境集成

启动过程中提示hadoop03还没有相关文件

spark源码编译和集群部署以及idea中sbt开发环境集成

并且提示我们hadoop02的JAVA_HOME没有设置,这就离谱了,我们第一个配置的就是JAVA_HOME。

错误信息中给出了启动worker的命令,我们直接拷贝到hadoop02上执行看下,发现是可以执行的

spark源码编译和集群部署以及idea中sbt开发环境集成

接着我们在界面中看下

spark源码编译和集群部署以及idea中sbt开发环境集成

到worker里面看下,发现和我们配置的一样

spark源码编译和集群部署以及idea中sbt开发环境集成

查看java进程,发现master已经在hadoop01上启动

spark源码编译和集群部署以及idea中sbt开发环境集成

hadoop02启动的是worker

spark源码编译和集群部署以及idea中sbt开发环境集成

现在剩下一个问题,在hadoop01上启动,无法启动hadoop02的worker

我们查看下​

​start-all.sh​

实际上​

​start-all.sh​

​​启动了​

​start-master.sh​

​​和​

​start-worker.sh​

spark源码编译和集群部署以及idea中sbt开发环境集成

重点看​

​start-worker.sh​

spark源码编译和集群部署以及idea中sbt开发环境集成

可以看到会执行这两个脚本,用于处理配置,我们把java环境在​

​spark-config.sh​

​中设置下

spark源码编译和集群部署以及idea中sbt开发环境集成

然后重新分发并重启

spark源码编译和集群部署以及idea中sbt开发环境集成

就没有报错,并且在hadoop02上成功启动了

spark源码编译和集群部署以及idea中sbt开发环境集成

将​

​/spark​

​目录分发到hadoop03并进行重启

spark源码编译和集群部署以及idea中sbt开发环境集成
spark源码编译和集群部署以及idea中sbt开发环境集成

需要注意,我们配置主节点和从节点的通信端口都是4040,界面访问端口都是8089。这也就是意味着,如果要在hadoop01上启动Worker ,会出现端口冲突。

不过我们有三个节点,本来就是打算让hadoop01成为master的。

开发环境集成

我们开发spark程序是在idea中开发的,所以需要在idea中安装scala的插件

spark源码编译和集群部署以及idea中sbt开发环境集成

然后在windows环境中配置scala的环境

先把scala的文件从hadoop01上下载到windows中

spark源码编译和集群部署以及idea中sbt开发环境集成

然后配置环境变量

spark源码编译和集群部署以及idea中sbt开发环境集成

接着在idea中指定scala的sdk

spark源码编译和集群部署以及idea中sbt开发环境集成
spark源码编译和集群部署以及idea中sbt开发环境集成

idea会自动扫描全部的scala的sdk

spark源码编译和集群部署以及idea中sbt开发环境集成

选择scala目录

spark源码编译和集群部署以及idea中sbt开发环境集成

有了scala就可以创建spark的项目了,不过还需要scala的打包构建工具,也就是sbt

spark源码编译和集群部署以及idea中sbt开发环境集成

我们到​​sbt - The interactive build tool (scala-sbt.org)​​下载sbt

spark源码编译和集群部署以及idea中sbt开发环境集成

下载zip包就行

spark源码编译和集群部署以及idea中sbt开发环境集成

将下载的sbt解压到windows中,并配置环境变量

spark源码编译和集群部署以及idea中sbt开发环境集成

继续创建项目(scala版本最好选择和我们编译spark相同的版本,当然创建好项目后在修改也是可以的)

spark源码编译和集群部署以及idea中sbt开发环境集成

根据上面这两篇资料的内容,简单了解下sbt。不希望能做到更好,只要能正常使用就行了。

然后重新打开idea,打开新建的studyspark项目

sbt就开始下载依赖了

spark源码编译和集群部署以及idea中sbt开发环境集成

此时sbt加载项目会异常的,因为idea默认使用自己的sbt,而不是我们安装的sbt

spark源码编译和集群部署以及idea中sbt开发环境集成

需要在settings中设置我们自己的sbt

spark源码编译和集群部署以及idea中sbt开发环境集成

重新构建项目

spark源码编译和集群部署以及idea中sbt开发环境集成

会弹出sbt-shell(如果你没有把sbt配置的全部勾选,那么应该是不会弹出sbt-shell的,那么这步可以跳过)

​/*************************跳过开始*****************************​

spark源码编译和集群部署以及idea中sbt开发环境集成

如果你也出现上述提示,那么表示在等待sbt使用默认的配置,从国外下载依赖

我们在sbt的配置中增加如下参数

-Dsbt.override.build.repos=true
-Dsbt.repository.config=E:\sbt\conf\repo.properties      

第一行表示使用全局的仓库配置

第二行则是指定使用的仓库配置的文件路径

spark源码编译和集群部署以及idea中sbt开发环境集成

记得取消这个勾选

spark源码编译和集群部署以及idea中sbt开发环境集成

然后重新导入项目

spark源码编译和集群部署以及idea中sbt开发环境集成

这里发现sbt-shell中文乱码了,很可惜,我尝试了网上找到的方式,都失败了,或许你可以试试,在sbt的参数中增加

-Dfile.encoding=UTF-8
-Dconsold.encoding=GBK      

对我都是无效的😆

​********************跳过结束*********************​

此时项目结构如下

spark源码编译和集群部署以及idea中sbt开发环境集成

项目相关的配置可以在​

​build.sbt​

​中修改

build.sbt如下

// 创建配置,用于标识spark的版本
lazy val sparkVersion = SettingKey[String]("spark-version", "spark version")
// 定义公共的包基础
lazy val packageBaseDir = SettingKey[String]("package-base-dir", "package base dir")
// 抽取公共配置
lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.12.15",
  sbtVersion := "1.6.1",
  sparkVersion := "3.2.0",
  // 定义项目包基础
  packageBaseDir := "com.study.spark",
  // 定义每个项目的包前缀
  ThisProject / idePackagePrefix := Some(packageBaseDir.value + "." + name.value),
  // 定义输出目录
  ideOutputDirectory := Some(file("target")),
  organization := "com.study.spark",
  // 申明源码路径
  sourceDirectories := Seq(
    file("src/main/scala"),
    file("src/main/java"),
    file("src/test/scala"),
    file("src/test/java")
  ),
  // 声明资源路径
  resourceDirectories := Seq(
    file("src/main/resources"),
    file("src/test/resources")
  ),
  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
  )
)
// 设置根项目
lazy val root = (project in file("."))
  .settings(
    name := "studyspark",
    commonSettings
  )      

接着我们把​

​sbt-assembly​

​插件加入到项目中

spark源码编译和集群部署以及idea中sbt开发环境集成

等待sbt刷新项目后如下

spark源码编译和集群部署以及idea中sbt开发环境集成

因为我们在根项目中是不会进行任何编码的,所以我们可以将根项目中的src目录删除

最终我们的代码是需要上传到git的,所以创建​

​.gitignore​

​​文件,将​

​target/​

​排除

我们创建一个helloworld的模块,验证sbt下的scala是否可用

spark源码编译和集群部署以及idea中sbt开发环境集成
spark源码编译和集群部署以及idea中sbt开发环境集成

别忘记在build.sbt中定义helloworld项目

lazy val helloworld = (project in file("helloworld"))
  .settings(
    name := "helloworld",
    commonSettings
  )      

别忘记创建目录​

​main,test,scala,java,resources​

​等.

创建好后,我们创建如下代码

spark源码编译和集群部署以及idea中sbt开发环境集成

在运行之前需要先执行compile操作

你可以在sbt-shell中执行

spark源码编译和集群部署以及idea中sbt开发环境集成

也可以在sbt工具窗口中执行(我比较喜欢在sbt-shell中操作)

spark源码编译和集群部署以及idea中sbt开发环境集成

点击运行

spark源码编译和集群部署以及idea中sbt开发环境集成

但是现在我们只是能在ide中执行,我们的目的是打出jar包的,所以我们还需要配置​

​sbt-assembly​

​插件

打开​

​sbt-assembly​

​​插件的官方文档​​sbt-assembly (scala-lang.org)​​

lazy val helloworld = (project in file("helloworld"))
  .settings(
    name := "helloworld",
    // 定义主类
  assembly / mainClass := Some(idePackagePrefix.value.get + ".Hello"),
    // 定义jar包名 项目名字_scala版本_项目版本.jar
    assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
    // 依赖合并策略
    assemblyMergeStrategy := {
      case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
      case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
      case "application.conf"                            => MergeStrategy.concat
      case "unwanted.txt"                                => MergeStrategy.discard
      case x =>
        val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
        oldStrategy(x)
    },
    commonSettings
  )      

然后执行​

​assembly​

​操作,记得切换到helloworld项目下,你也可以在sbt工具窗口执行

spark源码编译和集群部署以及idea中sbt开发环境集成

不过我喜欢在sbt-shell中执行

spark源码编译和集群部署以及idea中sbt开发环境集成

然后耐心的等待吧,第一次会比较慢的

spark源码编译和集群部署以及idea中sbt开发环境集成

可以看到jar包已经生成了

spark源码编译和集群部署以及idea中sbt开发环境集成

我们使用java -jar 执行

spark源码编译和集群部署以及idea中sbt开发环境集成

可以看到打出来的jar包成功执行了

到了这里scala和sbt都集成完毕,此时需要集成spark了

spark的依赖下载会比较慢,而且新版本国内的镜像不一定有,需要从国外下载

spark源码编译和集群部署以及idea中sbt开发环境集成

这里推荐一个Idea的插件:​

​Big Data Tools​

spark源码编译和集群部署以及idea中sbt开发环境集成

它可以连接到远程的hadoop,hdfs,spark的集群上,用于监控和交互,安装后重启idea会在下面出现工具窗口

spark源码编译和集群部署以及idea中sbt开发环境集成

支持的还是挺多的

spark源码编译和集群部署以及idea中sbt开发环境集成

首先我们将插件和hadoop01进行连接

spark源码编译和集群部署以及idea中sbt开发环境集成

展示如下,和我们在浏览器查看的效果是一样的

spark源码编译和集群部署以及idea中sbt开发环境集成

连接hdfs,需要注意,填写的hdfs的通信地址,不是浏览器地址

spark源码编译和集群部署以及idea中sbt开发环境集成

连接上之后,就可以像操作idea里面的项目文件一样,上传下载等

spark源码编译和集群部署以及idea中sbt开发环境集成

接着我们启动hadoop01的spark集群

spark源码编译和集群部署以及idea中sbt开发环境集成

可惜了,连接spark还是有问题

spark源码编译和集群部署以及idea中sbt开发环境集成

到了这里,工具装了一大堆,但是还没开始开发一行代码,接下来就开发一个小案例wordcount试试吧

当我们安装了​

​big data tools​

​​插件后,新建module的时候,就可以选择​

​big data tools​

​​下的​

​spark​

​模板了

spark源码编译和集群部署以及idea中sbt开发环境集成

这是一个类似于spring boot的模板,里面自动创建一些文件和配置,更准确应该是和maven比较像

spark源码编译和集群部署以及idea中sbt开发环境集成

加载完毕后项目结构如下

spark源码编译和集群部署以及idea中sbt开发环境集成

不过这种创建项目只是适合创建顶级项目,对于子级项目是不行的,会覆盖已有的build.sbt等信息的

我们前面已经搭建好了顶级项目,所以我们可以直接创建一个scala项目就行了

spark源码编译和集群部署以及idea中sbt开发环境集成

然后指定模块的名字就行了

spark源码编译和集群部署以及idea中sbt开发环境集成

我们做个优化,把打包的配置,除了主类之外的,都放在公共配置中

// 抽取公共配置
lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.12.15",
  sbtVersion := "1.6.1",
  sparkVersion := "3.2.0",
  // 定义项目包基础
  packageBaseDir := "com.study.spark",
  // 定义每个项目的包前缀
  ThisProject / idePackagePrefix := Some(packageBaseDir.value + "." + name.value),
  // 定义输出目录
  ideOutputDirectory := Some(file("target")),
  organization := "com.study.spark",
  // 申明源码路径
  sourceDirectories := Seq(
    file("src/main/scala"),
    file("src/main/java"),
    file("src/test/scala"),
    file("src/test/java")
  ),
  // 声明资源路径
  resourceDirectories := Seq(
    file("src/main/resources"),
    file("src/test/resources")
  ),
  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
  ),
  // 定义jar包名 项目名字_scala版本_项目版本.jar
  ThisProject / assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
  // 依赖合并策略
  ThisBuild / assemblyMergeStrategy := {
    case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
    case "application.conf"                            => MergeStrategy.concat
    case "unwanted.txt"                                => MergeStrategy.discard
    case x =>
      val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
      oldStrategy(x)
  },
)      

然后把wordcount加入到根项目中,主类先不写

lazy val wordcount = (project in file("wordcount"))
  .settings(
    name := "wordcount",
    commonSettings
  )      

然后创建目录

spark源码编译和集群部署以及idea中sbt开发环境集成

我们在scala目录下创建主类

spark源码编译和集群部署以及idea中sbt开发环境集成

首先使用本地进行执行

package com.study.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    // 配置spark
    val conf = new SparkConf().setMaster("local").setAppName("wordcount")
    // 获取sparkContext
    val sc = new SparkContext(conf)
    // 从本地读取文件,其实就是 build.sbt 文件
    sc.textFile("file:///E:\\java\\studyspark\\build.sbt")
      // 按照空格拆分
      .flatMap(_.split(" "))
      // 每个单词计数1
      .map(_->1)
      // 按单词分组统计
      .reduceByKey(_+_)
      // 计算
      .collect()
      // 打印统计结果
      .foreach(println(_))
  }

}      

别忘记在build.sbt中配置主类(之前我们只有一个有效的项目,所以我们可以直接取参数的值,但是现在有了多个项目,就需要在取值的时候,限定作用域)

spark源码编译和集群部署以及idea中sbt开发环境集成

需要注意的是我们配置的spark依赖是不会在classpath中使用的,因为默认当我们把spark任务的jar包提交给spark集群的时候,spark相关的依赖是不需要在jar包中的。

但是当我们需要本地启动的时候,如果没有spark的依赖,会导致spark相关的类找不到。

所以我们需要在本地启动的spark项目中,再次配置spark的依赖,区别是不写后面​

​provided​

​作用域

spark源码编译和集群部署以及idea中sbt开发环境集成

然后切换到wordcount项目下,编译并执行(使用run执行,而不是使用ide的执行按钮)

spark源码编译和集群部署以及idea中sbt开发环境集成

如果一切顺利,就会看到执行的日志

spark源码编译和集群部署以及idea中sbt开发环境集成

我们也可以打包的,使用​

​sbt-assembly​

​打包

spark源码编译和集群部署以及idea中sbt开发环境集成

然后使用java -jar 执行

执行会抛出异常,这是因为缺少了scala的依赖,我们需要把scala的依赖也加入

spark源码编译和集群部署以及idea中sbt开发环境集成

我们要求scala的依赖每次都必须打包,注意这里是一个​

​%​

打出来的jar包不能简单的使用java -jar 运行,需要提交到spark环境中执行,因为我们在打包的时候并没有打spark相关的包。

为了能在服务器上使用,我们需要修改我们的代码,需要链接到服务器上的spark集群,而且读取文件应该读取hdfs中的文件

首先启动服务器上的hdfs服务

spark源码编译和集群部署以及idea中sbt开发环境集成

接着启动spark集群

spark源码编译和集群部署以及idea中sbt开发环境集成

接着把build.sbt上传到hdfs中

spark源码编译和集群部署以及idea中sbt开发环境集成

然后重新打包,并上传到服务器

spark源码编译和集群部署以及idea中sbt开发环境集成

然后使用

spark-submit --master spark://hadoop01:4040 --class com.study.spark.wordcount.WordCount /spark/wordcount_2.12.15_1.0.jar      

提交到spark集群,spark就开始执行了

spark源码编译和集群部署以及idea中sbt开发环境集成

执行结果和在本地执行一模一样

spark源码编译和集群部署以及idea中sbt开发环境集成

而且在spark的界面中也能看到

spark源码编译和集群部署以及idea中sbt开发环境集成

如果你还没有配置历史服务,那么会启动报错的,提示没有配置历史数据存储路径

spark源码编译和集群部署以及idea中sbt开发环境集成

拷贝spark-defaults.conf.template为spark-defaults.conf

spark源码编译和集群部署以及idea中sbt开发环境集成

内容如下

# 是否开启日志
spark.eventLog.enabled true
# 日志存储路径
spark.eventLog.dir hdfs://hadoop01:8020/logs
# 是否压缩
spark.eventLog.compress true      

同时在spark-env.sh中配置以下内容

# 配置spark历史服务
SPARK_HISTORY_OPTS="-Dspark.history.ui.port=8086 -Dspark.history.retainedApplications=10 -Dspark.history.fs.logDirectory=hdfs://hadoop01:8020/logs"      

我们启动spark历史记录服务

spark源码编译和集群部署以及idea中sbt开发环境集成

启动历史记录服务不报错

spark源码编译和集群部署以及idea中sbt开发环境集成

而且界面能打开

spark源码编译和集群部署以及idea中sbt开发环境集成

然后在执行一次,因为我们的任务很快,所以开启历史记录,方便我们查看job的信息

这时候就能在历史界面查看了

spark源码编译和集群部署以及idea中sbt开发环境集成
spark源码编译和集群部署以及idea中sbt开发环境集成

我们实际上是想看看dag图

spark源码编译和集群部署以及idea中sbt开发环境集成

很漂亮。

源码编译的3.2.0版本无法在window上直接用spark-shell启动

这个问题我发现不仅仅是我们自己编译的二进制分发包有问题,就连官方下载的二进制分发包,如果你直接使用​

​spark-shell​

​​启动,也是会无法启动的。这个问题困扰我了好几天,我在网上找到了相关的讨论​​windows - Spark illegal character in path - Stack Overflow​​

但是也没给出问题原因和解决方案,只是说降低版本。

之前是源码编译二进制包,所以我也有spark的源码,我自己根据堆栈找了好长时间,奈何自己水平太低,从源码中没有找到相关的代码。

不过从我自己理解的源码来看,目前好像就是​

​spark-shell​

​的repl的启动存在问题,而提交作业等貌似是没有问题的。(我自己没试)

总结

整个文章的过程比较坎坷,比较艰难。

我是在看书的时候,知道了scala的sdk的二进制不兼容,以及目前市面上大多数公司使用cdh的发行包,于是我也到cdh的官网想下载最新的发行包,结果发现cdh版本貌似也开始收费了,于是只能自己使用源码编译。

源码编译时第一个阶段,当我历经艰难,完成了源码编译后,发现不能在windows上使用spark-shell,于是在服务器集群上搭建spark集群。这里主要学习了spark的启动和集群部署,以及一些配置等。

上述这两个阶段基本上是年前完成的,在搭建spark开发环境的时候,才知道spark或者说scala推荐使用sbt编译工具,可怜我maven都不是很熟悉,我哪会sbt呢。没辙,想办法研究sbt,研究sbt刚开始,就过年了,中间大概10多天吧,就是啥都没做,新年开工,才开始正式研究sbt,研究几天,基本上做到了入门,就继续在idea中集成spark,sbt环境。

最终实现了idea本地开发编码,本地在sbt的基础上执行,然后打包提交到spark集群中执行。

继续阅读