天天看點

大資料-spark概述1.  Spark概述2.  Spark叢集安裝3.  執行Spark程式

1.  Spark概述

1.1.   什麼是Spark(官網:http://spark.apache.org)

Spark是一種快速、通用、可擴充的大資料分析引擎。目前,Spark生态系統已經包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基于記憶體計算的大資料并行計算架構。Spark基于記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者将Spark部署在大量廉價硬體之上,形成叢集。

1.2.   為什麼要學Spark

中間結果輸出: Spark是MapReduce的替代方案,而且相容HDFS、Hive,可融入Hadoop的生态系統,以彌補MapReduce的不足。

1.3.  Spark特點

1.3.1.  快

DAG(計算路徑的有向無環圖)執行引擎,可以通過基于記憶體來高效處理資料流。

1.3.2.  易用

Spark支援Java、Python和Scala的API,還支援超過80種進階算法,使使用者可以快速建構不同的應用。而且Spark支援互動式的Python和Scala的shell,可以非常友善地在這些shell中使用Spark叢集來驗證解決問題的方法。

1.3.3.  通用

Spark提供了統一的解決方案。Spark可以用于批處理、互動式查詢(SparkSQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。

2.  Spark叢集安裝

2.1.  安裝

2.1.1.   機器部署

準備兩台以上Linux伺服器,安裝好JDK

2.1.2.  下載下傳Spark安裝包

上傳spark-安裝包到Linux上

解壓安裝包到指定位置

tar -zxvf spark-2.1.0-bin-hadoop2.6.tgz -C/usr/local

2.1.3.  配置Spark

進入到Spark安裝目錄

cd /usr/local/spark-2.1.0-bin-hadoop2.6

進入conf目錄并重命名并修改spark-env.sh.template檔案

cd conf/

mv spark-env.sh.template spark-env.sh

vi spark-env.sh

在該配置檔案中添加如下配置

export JAVA_HOME=/usr/java/jdk1.8.0_111

export SPARK_MASTER_IP=node1.edu360.cn

export SPARK_MASTER_PORT=7077

儲存退出

重命名并修改slaves.template檔案

mv slaves.template slaves

vi slaves

在該檔案中添加子節點所在的位置(Worker節點)

node2.edu360.cn

node3.edu360.cn

node4.edu360.cn

儲存退出

将配置好的Spark拷貝到其他節點上

scp -r spark-2.1.0-bin-hadoop2.6/node2.edu360.cn:/usr/local/

scp -r spark-2.1.0-bin-hadoop2.6/node3.edu360.cn:/usr/local/

scp -r spark-2.1.0-bin-hadoop2.6/node4.edu360.cn:/usr/local/

Spark叢集配置完畢,目前是1個Master,3個Work,在node1.edu360.cn上啟動Spark叢集

/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh

啟動後執行jps指令,主節點上有Master程序,其他子節點上有Work進行,登入Spark管理界面檢視叢集狀态(主節點):http://node1.edu360.cn:8080/

到此為止,Spark叢集安裝完畢,但是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要借助zookeeper,并且啟動至少兩個Master節點來實作高可靠,配置方式比較簡單:

Spark叢集規劃:node1,node2是Master;node3,node4,node5是Worker

安裝配置zk叢集,并啟動zk叢集

停止spark所有服務,修改配置檔案spark-env.sh,在該配置檔案中删掉SPARK_MASTER_IP并添加如下配置

exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=zk1,zk2,zk3-Dspark.deploy.zookeeper.dir=/spark"

1.在node1節點上修改slaves配置檔案内容指定worker節點

2.在node1上執行sbin/start-all.sh腳本,然後在node2上執行sbin/start-master.sh啟動第二個Master

3.  執行Spark程式

3.1.  執行第一個spark程式

/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit\

--class org.apache.spark.examples.SparkPi \

--master spark://node1.edu360.cn:7077 \

--executor-memory 1G \

--total-executor-cores 2 \

/usr/local/spark-2.1.0-bin-hadoop2.6/lib/spark-examples-2.1.0-hadoop2.6.0.jar\

100

該算法是利用蒙特·卡羅算法求PI

3.2.  啟動Spark Shell

spark-shell是Spark自帶的互動式Shell程式,友善使用者進行互動式程式設計,使用者可以在該指令行下用scala編寫spark程式。

3.2.1.  啟動spark shell

/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-shell\

--master spark://node1.edu360.cn:7077 \

--executor-memory 2g \

--total-executor-cores 2

參數說明:

--masterspark://node1.edu360.cn:7077 指定Master的位址

--executor-memory 2g 指定每個worker可用記憶體為2G

--total-executor-cores 2 指定整個叢集使用的cup核數為2個

注意:

如果啟動spark shell時沒有指定master位址,但是也可以正常啟動spark shell和執行spark shell中的程式,其實是啟動了spark的local模式,該模式僅在本機啟動一個程序,沒有與叢集建立聯系。

Spark Shell中已經預設将SparkContext類初始化為對象sc。使用者代碼如果需要用到,則直接應用sc即可

3.2.2.  在spark shell中編寫WordCount程式

1.首先啟動hdfs

2.向hdfs上傳一個檔案到hdfs://node1.edu360.cn:9000/words.txt

3.在spark shell中用scala語言編寫spark程式

sc.textFile("hdfs://node1.edu360.cn:9000/words.txt").flatMap(_.split(""))

.map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.edu360.cn:9000/out")

4.使用hdfs指令檢視結果

hdfs dfs -ls hdfs://node1.edu360.cn:9000/out/p*

說明:

sc是SparkContext對象,該對象時送出spark程式的入口

textFile(hdfs://node1.edu360.cn:9000/words.txt)是hdfs中讀取資料

flatMap(_.split(" "))先map在壓平

map((_,1))将單詞和1構成元組

reduceByKey(_+_)按照key進行reduce,并将value累加

saveAsTextFile("hdfs://node1.edu360.cn:9000/out")将結果寫入到hdfs中

3.3.  在IDEA中編寫WordCount程式

spark shell僅在測試和驗證我們的程式時使用的較多,在生産環境中,通常會在IDE中編制程式,然後打成jar包,然後送出到叢集,最常用的是建立一個Maven項目,利用Maven來管理jar包的依賴。

1.建立一個項目

2.選擇Maven項目,然後點選next

3.填寫maven的GAV,然後點選next

4.填寫項目名稱,然後點選finish

5.建立好maven項目後,點選Enable Auto-Import

6.配置Maven的pom.xml

<properties>

    <maven.compiler.source>1.8</maven.compiler.source>

    <maven.compiler.target>1.8</maven.compiler.target>

    <scala.version>2.11.8</scala.version>

    <spark.version>2.2.0</spark.version>

    <hadoop.version>2.8.0</hadoop.version>

    <encoding>UTF-8</encoding>

</properties>

<dependencies>

    <!-- 導入scala的依賴-->

    <dependency>

        <groupId>org.scala-lang</groupId>

        <artifactId>scala-library</artifactId>

        <version>${scala.version}</version>

    </dependency>

    <!-- 導入spark的依賴-->

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.11</artifactId>

        <version>${spark.version}</version>

    </dependency>

    <!-- 指定hadoop-client API的版本-->

    <dependency>

        <groupId>org.apache.hadoop</groupId>

        <artifactId>hadoop-client</artifactId>

        <version>${hadoop.version}</version>

    </dependency>

</dependencies>

<build>

    <pluginManagement>

        <plugins>

            <!-- 編譯scala的插件-->

            <plugin>

                <groupId>net.alchim31.maven</groupId>

                <artifactId>scala-maven-plugin</artifactId>

                <version>3.2.2</version>

            </plugin>

            <!-- 編譯java的插件-->

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.5.1</version>

            </plugin>

        </plugins>

    </pluginManagement>

    <plugins>

        <plugin>

            <groupId>net.alchim31.maven</groupId>

            <artifactId>scala-maven-plugin</artifactId>

            <executions>

                <execution>

                    <id>scala-compile-first</id>

                    <phase>process-resources</phase>

                    <goals>

                        <goal>add-source</goal>

                        <goal>compile</goal>

                    </goals>

                </execution>

                <execution>

                    <id>scala-test-compile</id>

                    <phase>process-test-resources</phase>

                    <goals>

                        <goal>testCompile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>

        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-compiler-plugin</artifactId>

            <executions>

                <execution>

                    <phase>compile</phase>

                    <goals>

                        <goal>compile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>

        <!-- 打jar插件-->

        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-shade-plugin</artifactId>

            <version>2.4.3</version>

            <executions>

                <execution>

                    <phase>package</phase>

                    <goals>

                        <goal>shade</goal>

                    </goals>

                    <configuration>

                        <filters>

                            <filter>

                                <artifact>*:*</artifact>

                                <excludes>

                                    <exclude>META-INF/*.SF</exclude>

                                    <exclude>META-INF/*.DSA</exclude>

                                    <exclude>META-INF/*.RSA</exclude>

                                </excludes>

                            </filter>

                        </filters>

                    </configuration>

                </execution>

            </executions>

        </plugin>

    </plugins>

</build>

7.建立一個scala class,類型為Object

8.編寫spark程式

val conf =new SparkConf().setAppName("WordCount").setMaster("local[4]")

//sparkContext是spark程式執行的入口

val sc =new SparkContext(conf)

//使用SparkContext建立RDD

//告訴spark以後從哪裡讀取資料(不會立即讀取資料,是lazy)

//sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1))

val lines: RDD[String] = sc.textFile(args(0))

//切分壓平資料

val words: RDD[String] = lines.flatMap(_.split(" "))

//将單詞和一組合在一起

val wordAndOne: RDD[(String, Int)] = words.map((_,1))

//按key進行聚合

val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)

//排序

val sorted: RDD[(String, Int)] = reduced.sortBy(_._2,false)

//産生結果(将資料儲存到hdfs中)

sorted.saveAsTextFile(args(1))

//釋放資源

sc.stop()

9.       

10.使用Maven打包:首先修改pom.xml中的main class

點選idea右側的MavenProject選項

點選Lifecycle,選擇clean和package,然後點選Run Maven Build

11.選擇編譯成功的jar包,并将該jar上傳到Spark叢集中的某個節點上

12.首先啟動hdfs和Spark叢集

啟動hdfs

/usr/local/hadoop-2.6.5/sbin/start-dfs.sh

啟動spark

/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh

13.使用spark-submit指令送出Spark應用(注意參數的順序)

/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit\

--class cn.itcast.spark.WordCount \

--master spark://node1.edu360.cn:7077 \

--executor-memory 2G \

--total-executor-cores 4 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.edu360.cn:9000/words.txt \

hdfs://node1.edu360.cn:9000/out

檢視程式執行結果

hdfs dfs -cathdfs://node1.edu360.cn:9000/out/part-00000

繼續閱讀