spark源码编译和集群部署以及idea中sbt开发环境集成
- 源码下载
- 源码编译
- maven 下载
- scala 下载
- 编译参数
- 编译
- 编译分发的二进制包
- 单机启动
- 集群部署
- 开发环境集成
- 源码编译的3.2.0版本无法在window上直接用spark-shell启动
- 总结
项目地址:https://gitee.com/jyq_18792721831/studyspark.git
源码下载
打开Apache Spark™ - Unified Engine for large-scale data analytics,下载源码

在下载界面中选择源码下载
随便选择哪个地址
下载源码后上传到hadoop01服务器的
/spark
目录下
使用
tar -zxvf spark-3.2.0.tgz
解压到
/spark
目录下
源码编译
返回下载界面,选择文档,选择最新版或者你下载的版本
或者你可以直接打开Overview - Spark 3.2.0 Documentation (apache.org)界面
选择跳转到打包构建的文档
有两种构建方式:maven和sbt(scala build tool)
我们选择maven
maven 下载
在编译文档中,要求我们尽可能使用新的maven
所以我们使用手动安装的方式,安装最新版
首先打开Maven – Welcome to Apache Maven,跳转到下载页
我们选择最新的二进制包
右键拷贝链接,直接在hadoop01上下载
在根目录下创建
/maven
目录,然后执行
curl -O https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz
下载
使用
tar -zxvf apache-maven-3.8.4-bin.tar.gz
解压
删除压缩包,并把maven里面的内容移动出来,使得maven的home为
/maven
因为之前就在hadoop01上安装过了java,并且配置了环境变量,所以可以直接配置maven的环境变量即可,高版本需要jdk1.7以上
配置环境变量
然后使用
source ~/.bash_profile
生效
使用
mvn --version
验证版本
scala 下载
到The Scala Programming Language (scala-lang.org)进入下载界面
选择scala2吧,scala3实在没用过。
拉到最下面,下载二进制包
相同的方式,拷贝链接,在hadoop01中下载
然后配置环境变量
使用
source ~/.bash_profile
生效后验证版本
编译参数
根据文档,首先设置maven的信息
我们将
MAVEN_OPTS
放入
~/.bash_profile
中
使用
source ~/.bash_profile
使之生效。
接着往下看构建文档,找到支持hive和jdbc的构建
这是maven命令,使用源码包中自带的maven,其中
-P
和
-D
是一些参数。
这些参数和pom.xml文件中的profile和properties有关。
-D
对应properties
-P
对应profile
我们查看pom.xml文件
这只是一部分,具体的可以自行查看
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- 我们使用的是java 11,这个需要进行替换 -->
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<!-- 因为我们使用的是3.8.1的maven,所以这个也需要替换 -->
<maven.version>3.6.3</maven.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.30</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<!-- hadoop 的版本是2.9.2 -->
<hadoop.version>3.3.1</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<zookeeper.version>3.6.2</zookeeper.version>
<curator.version>2.13.0</curator.version>
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<!-- Version used in Maven Hive dependency -->
<!-- hive的版本是2.3.9,这个可以不用修改 -->
<hive.version>2.3.9</hive.version>
<hive23.version>2.3.9</hive23.version>
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.8.0</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.12.1</parquet.version>
<orc.version>1.6.11</orc.version>
<jetty.version>9.4.43.v20210629</jetty.version>
<jakartaservlet.version>4.0.3</jakartaservlet.version>
<chill.version>0.10.0</chill.version>
<ivy.version>2.5.0</ivy.version>
<oro.version>2.0.8</oro.version>
<!--
If you changes codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
-->
<codahale.metrics.version>4.2.0</codahale.metrics.version>
<avro.version>1.10.2</avro.version>
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
<!-- org.apache.httpcomponents/httpclient-->
<commons.httpclient.version>4.5.13</commons.httpclient.version>
</properties>
所以,我们的构建命令中
-D
要设置的如下
mvn -Djava.version=11 -Dmaven.version=3.8.1 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -DskipTests clean package
接下来是
-P
参数
-P
参数对应的是pom.xml中的profile属性
<profile>
<id>hadoop-2.7</id>
<properties>
<hadoop.version>2.7.4</hadoop.version>
<curator.version>2.7.1</curator.version>
<commons-io.version>2.4</commons-io.version>
<hadoop-client-api.artifact>hadoop-client</hadoop-client-api.artifact>
<hadoop-client-runtime.artifact>hadoop-yarn-api</hadoop-client-runtime.artifact>
<hadoop-client-minicluster.artifact>hadoop-client</hadoop-client-minicluster.artifact>
</properties>
</profile>
<profile>
<id>hive-2.3</id>
</profile>
<profile>
<id>yarn</id>
<modules>
<module>resource-managers/yarn</module>
<module>common/network-yarn</module>
</modules>
</profile>
<profile>
<id>hive-thriftserver</id>
<modules>
<module>sql/hive-thriftserver</module>
</modules>
</profile>
<profile>
<id>scala-2.13</id>
<properties>
<scala.version>2.13.5</scala.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-target:jvm-1.8</arg>
<arg>-Wconf:cat=deprecation:wv,any:e</arg>
<arg>-Wconf:cat=scaladoc:wv</arg>
<arg>-Wconf:cat=lint-multiarg-infix:wv</arg>
<arg>-Wconf:cat=other-nullary-override:wv</arg>
<arg>-Wconf:cat=other-match-analysis&site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv</arg>
<arg>-Wconf:cat=other-pure-statement&site=org.apache.spark.streaming.util.FileBasedWriteAheadLog.readAll.readFile:wv</arg>
<arg>-Wconf:cat=other-pure-statement&site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite.<local OutputCommitCoordinatorSuite>.futureAction:wv</arg>
<arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s</arg>
<arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s</arg>
<arg>-Wconf:msg=Auto-application to \`\(\)\` is deprecated:s</arg>
<arg>-Wconf:msg=method with a single empty parameter list overrides method without any parameter list:s</arg>
<arg>-Wconf:msg=method without a parameter list overrides a method with a single empty one:s</arg>
<arg>-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e</arg>
</args>
<compilerPlugins combine.self="override">
</compilerPlugins>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</profile>
所以
-P
的参数大概就这么多
mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.13 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -Dscala.binary.version=2.13 -DskipTests clean package
编译
如果使用的是scala-2.13版本,那么需要切换scala版本
查看
change-scala-version.sh
的内容,主要是环境变量的配置和pom.xml文件中的版本号的修改。
在
pom.xml
中,scala相关的配置是properties,理论上我们使用
-D
参数就能修改,不过可能有其他的配置,不一定是pom.xml的,反正执行一次最好。
而且,需要注意的是,执行这个命令,必须在spark主目录下,因为在
change-scala-version.sh
脚本中使用相对路径执行其他脚本,所以如果你在其他位置执行,那么会因为上下文的问题,导致执行失败。
实际上,
change-scala-version.sh
脚本还会使用spark主目录下
build/mvn
程序。
我们使用上面拼接好的命令,在
SPARK_HOME
下执行
第一次执行,maven 会下载相关的依赖,比较慢
哎,依赖是在是多,耐心等待吧
等待了一段时间,发现编译错误了
很正常,一次成功才不正常。
我们查看错误的堆栈,发现是因为找不到一个为
RangePartitioner
的类,查看源码,这是一个在
apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala
中定义的类
问题在于scala-2.13中没有打包生成class文件,而scala-2.12中就会打包生成calss文件
生成的class文件存储在
/spark/core/target/scala-2.12/classes/org/apache/spark
目录下
不知道是不是scala和spark的兼容性问题。(我自己试了好多次,2.12.15可以编译成功,2.13.8就编译失败了。)
在使用scala-2.12编译的时候,编译到spark-sql的时候,卡主,怀疑是内存不足了
幸好是虚拟机,可以增加内存,我们尝试将内存增加到4G
然后终止编译,重新编译,并使用
-X
启动debug输出
经过尝试,调大内存在一定程度上能加快编译速度。
第一次编译可以采用一步一步执行,可以先编译后打包,其实打包也包含编译,我单独执行编译是为了验证scala-2.12是否会生成相关的class文件,打包成功如下
编译成功就会生成二进制文件,存储于spark根目录的bin目录下
其实这只是一部分,这些脚本调用的jar包也会生成。
我们在源码的bin目录下就可以开心的使用spark了
直接启动spark-shell
这些版本信息和我们在编译的时候指定的一模一样。
编译分发的二进制包
编译好只是这个环境编译好了,里面既有源码文件,也有可执行文件,我们不能在集群环境的每个节点上都进行编译一次,那么太过费时,而且也不可行。
所以我们需要编译可分发的二进制包,就像官网提供的spark一样,没有源码,全部是执行文件。
在官网中是建议我们使用工具编译分发的二进制包
查看这个脚本,发现还是maven命令
最后将编译后在bin目录中的内容进行打包而已
我们在spark的根目录下执行
./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package
打包完成后就会生成
psark-2.9.2-bin-2.12.tgz
压缩包。
并不会打印maven的日志,等待就行了,可能需要和maven打包同样长的时间。(2核,8G,大约40分钟)
完成后会在spark根目录下生成可分发二进制文件包
实际上当我们使用maven命令完成打包后,只是单独的想打包分发二进制包,那么是完全不需要重新执行maven打包的,毕竟maven打包太慢了。我们可以将spark根目录下的
./dev/make-distribution.sh
文件中的相关maven的命令跳过,注释掉脚本中的这两行即可。
然后把原来脚本中使用maven获取版本号的方式自定义指定即可
这样就不会重复执行maven命令了(maven命令太费时间了)
然后在maven打包完成的前提下,执行分发的二进制包的命令
./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package
spark源码编译实际上就是maven编译,如果你了解maven编译,那么在spark源码编译中遇到的问题,你都能很好的解决。
而二进制分发包的打包脚本,就是对maven编译的使用,然后在把编译后的文件重新组合。
单机启动
我们将编译好的二进制包,从hadoop01分发到hadoop02上,使用
scp spark-3.2.0-bin-2.12.tgz hadoop02:/spark/
然后在hadoop02上解压,
tar -zxvf spark-3.2.0-bin-2.12.tgz
我们选择在hadoop02上验证二进制分发包是否可用。
然后将
SPARK_HOME
设置到环境变量,并使之生效
然后你可以在任意位置使用
spark-shell
启动
相关的文档在这里Quick Start - Spark 3.2.0 Documentation (apache.org)
启动后如下
可见启动spark并不需要scala环境,应该把scala的相关的包集成进去了
我们尝试一下quick-started中的例子
然后打开
http://hadoop02:4040
查看执行历史
这种启动方式,在启动中会打印如下数据
这其实是spark的standalone的启动方式
相关的文档在这Overview - Spark 3.2.0 Documentation (apache.org)
关于standalone的模式的相关文档
Spark Standalone Mode - Spark 3.2.0 Documentation (apache.org)
集群部署
为什么直接启动
spark-shell
就是单机版的模式呢?
spark的配置和hadoop差不多,有个slaves文件,里面会指定从节点的域名
默认的配置在
/spark/conf
文件里面
但是你在这里面是找不到slaves文件的,不过有一个workers.template文件
查看这个文件,默认是localhost
在sbin目录下有一个
slaves.sh
文件
slaves.sh
文件跳转到了
workers.sh
文件中了
通过这个脚本基本上就能明白了,spark的从节点配置在
/sbin/spark-config.sh
和
/conf
中都可以配置
其中所有的配置都可以在
/conf/spark-env.sh
中配置
这些是一些standalone的配置
基本上都是通用的。
我们搭建一个spark的集群,集群信息如下
节点 | 角色 | 其他 |
hadoop01 | 主节点 | 一个执行器 |
hadoop02 | 从节点 | 一个执行器 |
hadoop03 | 从节点 | 一个执行器 |
完整的
/conf/spark-env.sh
如下
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# hadoop 的配置信息
HADOOP_CONF_DIR=/hadoop/etc/hadoop
# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
# spark shuffle的数据目录,
SPARK_LOCAL_DIRS=/spark/shuffle_data
# Options read in YARN client/cluster mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
# yarn的配置文件在hadoop的配置文件中
YARN_CONF_DIR=$HADOOP_CONF_DIR
# 每个节点启动几个执行器
SPARK_EXECUTOR_CORES=1
# 每个执行器可以使用多大内存
SPARK_EXECUTOR_MEMORY=1800M
# 每个driver可以使用多大内存
SPARK_DRIVER_MEMORY=1800M
# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
# 主节点
SPARK_MASTER_HOST=hadoop01
# 端口,不要和worker的端口重复,通信端口
SPARK_MASTER_PORT=4040
# 界面端口
SPARK_MASTER_WEBUI_PORT=8089
# 每个从节点启动的执行器
SPARK_WORKER_CORES=1
# 每个从节点可使用最大内存
SPARK_WORKER_MEMORY=1800M
# 从节点的通信端口
SPARK_WORKER_PORT=4040
# 界面端口
SPARK_WORKER_WEBUI_PORT=8089
# 从节点工作目录
SPARK_WORKER_DIR=/spark/worker
# Options for launcher
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")
# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
# - SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
# Options for native BLAS, like Intel MKL, OpenBLAS, and so on.
# You might get better performance to enable these options if using native BLAS (see SPARK-21305).
# - MKL_NUM_THREADS=1 Disable multi-threading of Intel MKL
# - OPENBLAS_NUM_THREADS=1 Disable multi-threading of OpenBLAS
# spark 的日志目录
SPARK_LOG_DIR=/spark/logs
相关的配置项说明在这Spark Standalone Mode - Spark 3.2.0 Documentation (apache.org)
然后配置workers
接着把这个配置好的文件分发到hadoop02的
/spark目录下
使用
scp -r /spark hadoop02:/spark/
然后在hadoop01上启动(当然如果你在hadoop01上配置那么就可以不需要从hadoop02分发到hadoop01上了)
别忘记
SPARK_HOME
接着切换到
/spark/sbin
目录下,执行启动脚本
启动过程中提示hadoop03还没有相关文件
并且提示我们hadoop02的JAVA_HOME没有设置,这就离谱了,我们第一个配置的就是JAVA_HOME。
错误信息中给出了启动worker的命令,我们直接拷贝到hadoop02上执行看下,发现是可以执行的
接着我们在界面中看下
到worker里面看下,发现和我们配置的一样
查看java进程,发现master已经在hadoop01上启动
hadoop02启动的是worker
现在剩下一个问题,在hadoop01上启动,无法启动hadoop02的worker
我们查看下
start-all.sh
实际上
start-all.sh
启动了
start-master.sh
和
start-worker.sh
重点看
start-worker.sh
可以看到会执行这两个脚本,用于处理配置,我们把java环境在
spark-config.sh
中设置下
然后重新分发并重启
就没有报错,并且在hadoop02上成功启动了
将
/spark
目录分发到hadoop03并进行重启
需要注意,我们配置主节点和从节点的通信端口都是4040,界面访问端口都是8089。这也就是意味着,如果要在hadoop01上启动Worker ,会出现端口冲突。
不过我们有三个节点,本来就是打算让hadoop01成为master的。
开发环境集成
我们开发spark程序是在idea中开发的,所以需要在idea中安装scala的插件
然后在windows环境中配置scala的环境
先把scala的文件从hadoop01上下载到windows中
然后配置环境变量
接着在idea中指定scala的sdk
idea会自动扫描全部的scala的sdk
选择scala目录
有了scala就可以创建spark的项目了,不过还需要scala的打包构建工具,也就是sbt
我们到sbt - The interactive build tool (scala-sbt.org)下载sbt
下载zip包就行
将下载的sbt解压到windows中,并配置环境变量
继续创建项目(scala版本最好选择和我们编译spark相同的版本,当然创建好项目后在修改也是可以的)
根据上面这两篇资料的内容,简单了解下sbt。不希望能做到更好,只要能正常使用就行了。
然后重新打开idea,打开新建的studyspark项目
sbt就开始下载依赖了
此时sbt加载项目会异常的,因为idea默认使用自己的sbt,而不是我们安装的sbt
需要在settings中设置我们自己的sbt
重新构建项目
会弹出sbt-shell(如果你没有把sbt配置的全部勾选,那么应该是不会弹出sbt-shell的,那么这步可以跳过)
/*************************跳过开始*****************************
如果你也出现上述提示,那么表示在等待sbt使用默认的配置,从国外下载依赖
我们在sbt的配置中增加如下参数
-Dsbt.override.build.repos=true
-Dsbt.repository.config=E:\sbt\conf\repo.properties
第一行表示使用全局的仓库配置
第二行则是指定使用的仓库配置的文件路径
记得取消这个勾选
然后重新导入项目
这里发现sbt-shell中文乱码了,很可惜,我尝试了网上找到的方式,都失败了,或许你可以试试,在sbt的参数中增加
-Dfile.encoding=UTF-8
-Dconsold.encoding=GBK
对我都是无效的😆
********************跳过结束*********************
此时项目结构如下
项目相关的配置可以在
build.sbt
中修改
build.sbt如下
// 创建配置,用于标识spark的版本
lazy val sparkVersion = SettingKey[String]("spark-version", "spark version")
// 定义公共的包基础
lazy val packageBaseDir = SettingKey[String]("package-base-dir", "package base dir")
// 抽取公共配置
lazy val commonSettings = Seq(
version := "1.0",
scalaVersion := "2.12.15",
sbtVersion := "1.6.1",
sparkVersion := "3.2.0",
// 定义项目包基础
packageBaseDir := "com.study.spark",
// 定义每个项目的包前缀
ThisProject / idePackagePrefix := Some(packageBaseDir.value + "." + name.value),
// 定义输出目录
ideOutputDirectory := Some(file("target")),
organization := "com.study.spark",
// 申明源码路径
sourceDirectories := Seq(
file("src/main/scala"),
file("src/main/java"),
file("src/test/scala"),
file("src/test/java")
),
// 声明资源路径
resourceDirectories := Seq(
file("src/main/resources"),
file("src/test/resources")
),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
)
)
// 设置根项目
lazy val root = (project in file("."))
.settings(
name := "studyspark",
commonSettings
)
接着我们把
sbt-assembly
插件加入到项目中
等待sbt刷新项目后如下
因为我们在根项目中是不会进行任何编码的,所以我们可以将根项目中的src目录删除
最终我们的代码是需要上传到git的,所以创建
.gitignore
文件,将
target/
排除
我们创建一个helloworld的模块,验证sbt下的scala是否可用
别忘记在build.sbt中定义helloworld项目
lazy val helloworld = (project in file("helloworld"))
.settings(
name := "helloworld",
commonSettings
)
别忘记创建目录
main,test,scala,java,resources
等.
创建好后,我们创建如下代码
在运行之前需要先执行compile操作
你可以在sbt-shell中执行
也可以在sbt工具窗口中执行(我比较喜欢在sbt-shell中操作)
点击运行
但是现在我们只是能在ide中执行,我们的目的是打出jar包的,所以我们还需要配置
sbt-assembly
插件
打开
sbt-assembly
插件的官方文档sbt-assembly (scala-lang.org)
lazy val helloworld = (project in file("helloworld"))
.settings(
name := "helloworld",
// 定义主类
assembly / mainClass := Some(idePackagePrefix.value.get + ".Hello"),
// 定义jar包名 项目名字_scala版本_项目版本.jar
assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
// 依赖合并策略
assemblyMergeStrategy := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
},
commonSettings
)
然后执行
assembly
操作,记得切换到helloworld项目下,你也可以在sbt工具窗口执行
不过我喜欢在sbt-shell中执行
然后耐心的等待吧,第一次会比较慢的
可以看到jar包已经生成了
我们使用java -jar 执行
可以看到打出来的jar包成功执行了
到了这里scala和sbt都集成完毕,此时需要集成spark了
spark的依赖下载会比较慢,而且新版本国内的镜像不一定有,需要从国外下载
这里推荐一个Idea的插件:
Big Data Tools
它可以连接到远程的hadoop,hdfs,spark的集群上,用于监控和交互,安装后重启idea会在下面出现工具窗口
支持的还是挺多的
首先我们将插件和hadoop01进行连接
展示如下,和我们在浏览器查看的效果是一样的
连接hdfs,需要注意,填写的hdfs的通信地址,不是浏览器地址
连接上之后,就可以像操作idea里面的项目文件一样,上传下载等
接着我们启动hadoop01的spark集群
可惜了,连接spark还是有问题
到了这里,工具装了一大堆,但是还没开始开发一行代码,接下来就开发一个小案例wordcount试试吧
当我们安装了
big data tools
插件后,新建module的时候,就可以选择
big data tools
下的
spark
模板了
这是一个类似于spring boot的模板,里面自动创建一些文件和配置,更准确应该是和maven比较像
加载完毕后项目结构如下
不过这种创建项目只是适合创建顶级项目,对于子级项目是不行的,会覆盖已有的build.sbt等信息的
我们前面已经搭建好了顶级项目,所以我们可以直接创建一个scala项目就行了
然后指定模块的名字就行了
我们做个优化,把打包的配置,除了主类之外的,都放在公共配置中
// 抽取公共配置
lazy val commonSettings = Seq(
version := "1.0",
scalaVersion := "2.12.15",
sbtVersion := "1.6.1",
sparkVersion := "3.2.0",
// 定义项目包基础
packageBaseDir := "com.study.spark",
// 定义每个项目的包前缀
ThisProject / idePackagePrefix := Some(packageBaseDir.value + "." + name.value),
// 定义输出目录
ideOutputDirectory := Some(file("target")),
organization := "com.study.spark",
// 申明源码路径
sourceDirectories := Seq(
file("src/main/scala"),
file("src/main/java"),
file("src/test/scala"),
file("src/test/java")
),
// 声明资源路径
resourceDirectories := Seq(
file("src/main/resources"),
file("src/test/resources")
),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
),
// 定义jar包名 项目名字_scala版本_项目版本.jar
ThisProject / assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
// 依赖合并策略
ThisBuild / assemblyMergeStrategy := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
},
)
然后把wordcount加入到根项目中,主类先不写
lazy val wordcount = (project in file("wordcount"))
.settings(
name := "wordcount",
commonSettings
)
然后创建目录
我们在scala目录下创建主类
首先使用本地进行执行
package com.study.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark
val conf = new SparkConf().setMaster("local").setAppName("wordcount")
// 获取sparkContext
val sc = new SparkContext(conf)
// 从本地读取文件,其实就是 build.sbt 文件
sc.textFile("file:///E:\\java\\studyspark\\build.sbt")
// 按照空格拆分
.flatMap(_.split(" "))
// 每个单词计数1
.map(_->1)
// 按单词分组统计
.reduceByKey(_+_)
// 计算
.collect()
// 打印统计结果
.foreach(println(_))
}
}
别忘记在build.sbt中配置主类(之前我们只有一个有效的项目,所以我们可以直接取参数的值,但是现在有了多个项目,就需要在取值的时候,限定作用域)
需要注意的是我们配置的spark依赖是不会在classpath中使用的,因为默认当我们把spark任务的jar包提交给spark集群的时候,spark相关的依赖是不需要在jar包中的。
但是当我们需要本地启动的时候,如果没有spark的依赖,会导致spark相关的类找不到。
所以我们需要在本地启动的spark项目中,再次配置spark的依赖,区别是不写后面
provided
作用域
然后切换到wordcount项目下,编译并执行(使用run执行,而不是使用ide的执行按钮)
如果一切顺利,就会看到执行的日志
我们也可以打包的,使用
sbt-assembly
打包
然后使用java -jar 执行
执行会抛出异常,这是因为缺少了scala的依赖,我们需要把scala的依赖也加入
我们要求scala的依赖每次都必须打包,注意这里是一个
%
打出来的jar包不能简单的使用java -jar 运行,需要提交到spark环境中执行,因为我们在打包的时候并没有打spark相关的包。
为了能在服务器上使用,我们需要修改我们的代码,需要链接到服务器上的spark集群,而且读取文件应该读取hdfs中的文件
首先启动服务器上的hdfs服务
接着启动spark集群
接着把build.sbt上传到hdfs中
然后重新打包,并上传到服务器
然后使用
spark-submit --master spark://hadoop01:4040 --class com.study.spark.wordcount.WordCount /spark/wordcount_2.12.15_1.0.jar
提交到spark集群,spark就开始执行了
执行结果和在本地执行一模一样
而且在spark的界面中也能看到
如果你还没有配置历史服务,那么会启动报错的,提示没有配置历史数据存储路径
拷贝spark-defaults.conf.template为spark-defaults.conf
内容如下
# 是否开启日志
spark.eventLog.enabled true
# 日志存储路径
spark.eventLog.dir hdfs://hadoop01:8020/logs
# 是否压缩
spark.eventLog.compress true
同时在spark-env.sh中配置以下内容
# 配置spark历史服务
SPARK_HISTORY_OPTS="-Dspark.history.ui.port=8086 -Dspark.history.retainedApplications=10 -Dspark.history.fs.logDirectory=hdfs://hadoop01:8020/logs"
我们启动spark历史记录服务
启动历史记录服务不报错
而且界面能打开
然后在执行一次,因为我们的任务很快,所以开启历史记录,方便我们查看job的信息
这时候就能在历史界面查看了
我们实际上是想看看dag图
很漂亮。
源码编译的3.2.0版本无法在window上直接用spark-shell启动
这个问题我发现不仅仅是我们自己编译的二进制分发包有问题,就连官方下载的二进制分发包,如果你直接使用
spark-shell
启动,也是会无法启动的。这个问题困扰我了好几天,我在网上找到了相关的讨论windows - Spark illegal character in path - Stack Overflow
但是也没给出问题原因和解决方案,只是说降低版本。
之前是源码编译二进制包,所以我也有spark的源码,我自己根据堆栈找了好长时间,奈何自己水平太低,从源码中没有找到相关的代码。
不过从我自己理解的源码来看,目前好像就是
spark-shell
的repl的启动存在问题,而提交作业等貌似是没有问题的。(我自己没试)
总结
整个文章的过程比较坎坷,比较艰难。
我是在看书的时候,知道了scala的sdk的二进制不兼容,以及目前市面上大多数公司使用cdh的发行包,于是我也到cdh的官网想下载最新的发行包,结果发现cdh版本貌似也开始收费了,于是只能自己使用源码编译。
源码编译时第一个阶段,当我历经艰难,完成了源码编译后,发现不能在windows上使用spark-shell,于是在服务器集群上搭建spark集群。这里主要学习了spark的启动和集群部署,以及一些配置等。
上述这两个阶段基本上是年前完成的,在搭建spark开发环境的时候,才知道spark或者说scala推荐使用sbt编译工具,可怜我maven都不是很熟悉,我哪会sbt呢。没辙,想办法研究sbt,研究sbt刚开始,就过年了,中间大概10多天吧,就是啥都没做,新年开工,才开始正式研究sbt,研究几天,基本上做到了入门,就继续在idea中集成spark,sbt环境。
最终实现了idea本地开发编码,本地在sbt的基础上执行,然后打包提交到spark集群中执行。