1. Spark概述
1.1. 什麼是Spark(官網:http://spark.apache.org)
Spark是一種快速、通用、可擴充的大資料分析引擎。目前,Spark生态系統已經包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基于記憶體計算的大資料并行計算架構。Spark基于記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者将Spark部署在大量廉價硬體之上,形成叢集。
1.2. 為什麼要學Spark
中間結果輸出: Spark是MapReduce的替代方案,而且相容HDFS、Hive,可融入Hadoop的生态系統,以彌補MapReduce的不足。
1.3. Spark特點
1.3.1. 快
DAG(計算路徑的有向無環圖)執行引擎,可以通過基于記憶體來高效處理資料流。
1.3.2. 易用
Spark支援Java、Python和Scala的API,還支援超過80種進階算法,使使用者可以快速建構不同的應用。而且Spark支援互動式的Python和Scala的shell,可以非常友善地在這些shell中使用Spark叢集來驗證解決問題的方法。
1.3.3. 通用
Spark提供了統一的解決方案。Spark可以用于批處理、互動式查詢(SparkSQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
2. Spark叢集安裝
2.1. 安裝
2.1.1. 機器部署
準備兩台以上Linux伺服器,安裝好JDK
2.1.2. 下載下傳Spark安裝包
上傳spark-安裝包到Linux上
解壓安裝包到指定位置
tar -zxvf spark-2.1.0-bin-hadoop2.6.tgz -C/usr/local
2.1.3. 配置Spark
進入到Spark安裝目錄
cd /usr/local/spark-2.1.0-bin-hadoop2.6
進入conf目錄并重命名并修改spark-env.sh.template檔案
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
在該配置檔案中添加如下配置
export JAVA_HOME=/usr/java/jdk1.8.0_111
export SPARK_MASTER_IP=node1.edu360.cn
export SPARK_MASTER_PORT=7077
儲存退出
重命名并修改slaves.template檔案
mv slaves.template slaves
vi slaves
在該檔案中添加子節點所在的位置(Worker節點)
node2.edu360.cn
node3.edu360.cn
node4.edu360.cn
儲存退出
将配置好的Spark拷貝到其他節點上
scp -r spark-2.1.0-bin-hadoop2.6/node2.edu360.cn:/usr/local/
scp -r spark-2.1.0-bin-hadoop2.6/node3.edu360.cn:/usr/local/
scp -r spark-2.1.0-bin-hadoop2.6/node4.edu360.cn:/usr/local/
Spark叢集配置完畢,目前是1個Master,3個Work,在node1.edu360.cn上啟動Spark叢集
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
啟動後執行jps指令,主節點上有Master程序,其他子節點上有Work進行,登入Spark管理界面檢視叢集狀态(主節點):http://node1.edu360.cn:8080/
到此為止,Spark叢集安裝完畢,但是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要借助zookeeper,并且啟動至少兩個Master節點來實作高可靠,配置方式比較簡單:
Spark叢集規劃:node1,node2是Master;node3,node4,node5是Worker
安裝配置zk叢集,并啟動zk叢集
停止spark所有服務,修改配置檔案spark-env.sh,在該配置檔案中删掉SPARK_MASTER_IP并添加如下配置
exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=zk1,zk2,zk3-Dspark.deploy.zookeeper.dir=/spark"
1.在node1節點上修改slaves配置檔案内容指定worker節點
2.在node1上執行sbin/start-all.sh腳本,然後在node2上執行sbin/start-master.sh啟動第二個Master
3. 執行Spark程式
3.1. 執行第一個spark程式
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit\
--class org.apache.spark.examples.SparkPi \
--master spark://node1.edu360.cn:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/usr/local/spark-2.1.0-bin-hadoop2.6/lib/spark-examples-2.1.0-hadoop2.6.0.jar\
100
該算法是利用蒙特·卡羅算法求PI
3.2. 啟動Spark Shell
spark-shell是Spark自帶的互動式Shell程式,友善使用者進行互動式程式設計,使用者可以在該指令行下用scala編寫spark程式。
3.2.1. 啟動spark shell
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-shell\
--master spark://node1.edu360.cn:7077 \
--executor-memory 2g \
--total-executor-cores 2
參數說明:
--masterspark://node1.edu360.cn:7077 指定Master的位址
--executor-memory 2g 指定每個worker可用記憶體為2G
--total-executor-cores 2 指定整個叢集使用的cup核數為2個
注意:
如果啟動spark shell時沒有指定master位址,但是也可以正常啟動spark shell和執行spark shell中的程式,其實是啟動了spark的local模式,該模式僅在本機啟動一個程序,沒有與叢集建立聯系。
Spark Shell中已經預設将SparkContext類初始化為對象sc。使用者代碼如果需要用到,則直接應用sc即可
3.2.2. 在spark shell中編寫WordCount程式
1.首先啟動hdfs
2.向hdfs上傳一個檔案到hdfs://node1.edu360.cn:9000/words.txt
3.在spark shell中用scala語言編寫spark程式
sc.textFile("hdfs://node1.edu360.cn:9000/words.txt").flatMap(_.split(""))
.map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.edu360.cn:9000/out")
4.使用hdfs指令檢視結果
hdfs dfs -ls hdfs://node1.edu360.cn:9000/out/p*
說明:
sc是SparkContext對象,該對象時送出spark程式的入口
textFile(hdfs://node1.edu360.cn:9000/words.txt)是hdfs中讀取資料
flatMap(_.split(" "))先map在壓平
map((_,1))将單詞和1構成元組
reduceByKey(_+_)按照key進行reduce,并将value累加
saveAsTextFile("hdfs://node1.edu360.cn:9000/out")将結果寫入到hdfs中
3.3. 在IDEA中編寫WordCount程式
spark shell僅在測試和驗證我們的程式時使用的較多,在生産環境中,通常會在IDE中編制程式,然後打成jar包,然後送出到叢集,最常用的是建立一個Maven項目,利用Maven來管理jar包的依賴。
1.建立一個項目
2.選擇Maven項目,然後點選next
3.填寫maven的GAV,然後點選next
4.填寫項目名稱,然後點選finish
5.建立好maven項目後,點選Enable Auto-Import
6.配置Maven的pom.xml
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.8.0</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- 導入scala的依賴--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 導入spark的依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 指定hadoop-client API的版本--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <!-- 編譯scala的插件--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 編譯java的插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> |
7.建立一個scala class,類型為Object
8.編寫spark程式
val conf =new SparkConf().setAppName("WordCount").setMaster("local[4]") //sparkContext是spark程式執行的入口 val sc =new SparkContext(conf) //使用SparkContext建立RDD //告訴spark以後從哪裡讀取資料(不會立即讀取資料,是lazy) //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1)) val lines: RDD[String] = sc.textFile(args(0)) //切分壓平資料 val words: RDD[String] = lines.flatMap(_.split(" ")) //将單詞和一組合在一起 val wordAndOne: RDD[(String, Int)] = words.map((_,1)) //按key進行聚合 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //排序 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2,false) //産生結果(将資料儲存到hdfs中) sorted.saveAsTextFile(args(1)) //釋放資源 sc.stop() |
10.使用Maven打包:首先修改pom.xml中的main class
點選idea右側的MavenProject選項
點選Lifecycle,選擇clean和package,然後點選Run Maven Build
11.選擇編譯成功的jar包,并将該jar上傳到Spark叢集中的某個節點上
12.首先啟動hdfs和Spark叢集
啟動hdfs
/usr/local/hadoop-2.6.5/sbin/start-dfs.sh
啟動spark
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
13.使用spark-submit指令送出Spark應用(注意參數的順序)
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit\
--class cn.itcast.spark.WordCount \
--master spark://node1.edu360.cn:7077 \
--executor-memory 2G \
--total-executor-cores 4 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node1.edu360.cn:9000/words.txt \
hdfs://node1.edu360.cn:9000/out
檢視程式執行結果
hdfs dfs -cathdfs://node1.edu360.cn:9000/out/part-00000