天天看點

Spark入門實戰指南——Spark生态圈+第一個Spark程式一、Spark及其生态圈簡介二、Spark的WordCount程式

一、Spark及其生态圈簡介

1.目前大資料處理場景有以下幾個類型:

1.  複雜的批量處理(BatchData Processing),偏重點在于處理海量資料的能力,至于處理速度可忍受,通常的時間可能是在數十分鐘到數小時;

2.  基于曆史資料的互動式查詢(Interactive Query),通常的時間在數十秒到數十分鐘之間

3. 基于實時資料流的資料處理(Streaming Data Processing),通常在數百毫秒到數秒之間

目前對以上三種場景需求都有比較成熟的處理架構,第一種情況可以用Hadoop的MapReduce來進行批量海量資料處理,第二種情況可以Impala進行互動式查詢,對于第三中情況可以用Storm分布式處理架構處理實時流式資料。以上三者都是比較獨立,各自一套維護成本比較高,而Spark的出現能夠一站式平台滿意以上需求。

2.生态系統

2.1 Spark Core

2.2 SparkStreaming

2.3 Spark SQL

Shark即Hive on Spark,本質上是通過Hive的HQL解析,把HQL翻譯成Spark上的RDD操作,然後通過Hive的metadata擷取資料庫裡的表資訊,實際HDFS上的資料和檔案,會由Shark擷取并放到Spark上運算。Shark的最大特性就是快和與Hive的完全相容,且可以在shell模式下使用rdd2sql()這樣的API,把HQL得到的結果集,繼續在scala環境下運算,支援自己編寫簡單的機器學習或簡單分析處理函數,對HQL結果進一步分析計算。

2.4 BlinkDB

和傳統關系型資料庫不同,BlinkDB是一個很有意思的互動式查詢系統,就像一個跷跷闆,使用者需要在查詢精度和查詢時間上做一權衡;如果使用者想更快地擷取查詢結果,那麼将犧牲查詢結果的精度;同樣的,使用者如果想擷取更高精度的查詢結果,就需要犧牲查詢響應時間。使用者可以在查詢的時候定義一個失誤邊界。

2.5 MLBase/MLlib

MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。

l  MLOptimizer會選擇它認為最适合的已經在内部實作好了的機器學習算法和相關參數,來處理使用者輸入的資料,并傳回模型或别的幫助分析的結果;

l  MLI 是一個進行特征抽取和進階ML程式設計抽象的算法實作的API或平台;

l MLlib是Spark實作一些常見的機器學習算法和實用程式,包括分類、回歸、聚類、協同過濾、降維以及底層優化,該算法可以進行可擴充; MLRuntime 基于Spark計算架構,将Spark的分布式計算應用到機器學習領域。

總的來說,MLBase的核心是他的優化器,把聲明式的Task轉化成複雜的學習計劃,産出最優的模型和計算結果。與其他機器學習Weka和Mahout不同的是:

l  MLBase是分布式的,Weka是一個單機的系統;

l  MLBase是自動化的,Weka和Mahout都需要使用者具備機器學習技能,來選擇自己想要的算法和參數來做處理;

l  MLBase提供了不同抽象程度的接口,讓算法可以擴充

l  MLBase基于Spark這個平台

2.6 GraphX

GraphX是Spark中用于圖和圖并行計算的API

2.7 SparkR

SparkR是AMPLab釋出的一個R開發包,使得R擺脫單機運作的命運,可以作為Spark的job運作在叢集上,極大得擴充了R的資料處理能力

2.8 Tachyon

Tachyon是一個高容錯的分布式檔案系統,允許檔案以記憶體的速度在叢集架構中進行可靠的共享,就像Spark和 MapReduce那樣。通過利用資訊繼承,記憶體侵入,Tachyon獲得了高性能。

二、Spark的WordCount程式

安裝:安裝Idea、安裝本地JAVA 8 和Scala 2.10.4、為IDEA安裝Scala ①建立項目,指定JDK1.80.X和Scala2.10.4 ②設定工程的Libraries核心是添加Spark的jar (File-->Project Structure-->libraries-->加号(java)-->spark-1.6.0-bin-hadoop2.6-->lib-->spark-assembly-1.6.0-hadoop2.6.0.jar--apply-->ok)

package idea.dt.spark

import org.apache.spark.{SparkConf, SparkContext}

/**
  * (本地測試的程式)
  * 統計一個檔案中出現的單詞計數
  * @author zzh
  * @time	 2016/8/23
  */

object WordCount {
  def main(args: Array[String]) {

    /*
     * 第一步:建立Spark的配置對象SparkConf,設定Spark程式運作時的配置資訊
     * 例如說通過setMaster來設定程式焰蓮劫的Spark叢集的Master的URL,如果設定為
     * LOCAL,則代表Spark程式在本地運作,特别适合與機器配置條件差的初學者
     */
    val conf = new SparkConf() //建立SparkConf對象
    conf.setAppName("Wow,My First Spark App") //設定程式的名稱,在程式運作的監控界面可以看到名稱
    conf.setMaster("local") //程式本地運作不需要Spark叢集

    /*
     *第二步:建立SparkContext對象<——Spark程式所有功能唯一的入口(無論是Scala、Java、Python、R都有一個SparkContext)
     * SparkContext的核心作用:初始化spark應用程式運作搜徐的核心元件,包括DAGScheduler、TaskScheduler、schedulerBackend
     * 同時還會負責Spark程式往Master注冊程式等
     */
    val sc = new SparkContext(conf) //建立SparkContext對象,通過傳入的SparkConf執行個體來定制Spark運作的具體參數和配置資訊

    /*
     * 第三步:根據具體的資料來源(HDFS、Hbase、Local FS、DB、S3等)通過SparkContext來建立RDD
     * RDD的建立基本三種方式:通過外部的資料源(例如:HDFS)、根據Scala集合、由其他的RDD操作
     * 資料會被RDD劃分成一系列的Partitions,配置設定到沒法Partition的資料與一個資料Task的處理範疇
     */
    //并行度隻有一個
    //    val lines:RDD[String] = sc.textFile("F:\\深入了解大資料Software\\大資料壓縮檔案\\spark-1.6.0-bin-hadoop2.6\\spark-1.6.0-bin-hadoop2.6\\README.md", 1)
    val lines = sc.textFile("F:\\深入了解大資料Software\\大資料壓縮檔案\\spark-1.6.0-bin-hadoop2.6\\spark-1.6.0-bin-hadoop2.6\\README.md", 1)

    /*
     * 第四步:對初始的RDD進行Transformation級别的處理,例如map、filter等高階函數等的程式設計,來進行具體資料計算
     */
    //4.1對每一行進行拆分
    val words = lines.flatMap { line => line.split(" ") }
    //4.2對每個單詞計數為1,
    val pairs = words.map { word => (word, 1) }
    //4.3統計每個單詞在檔案中的總數
    val wordCountsByOrder = pairs.reduceByKey(_ + _).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1))//對相同的Key,進行Value的累積(包括Local和Reduce級别的同時Reduce)

    wordCountsByOrder.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2) )
    wordCountsByOrder.saveAsTextFile()

    sc.stop()
  }
}
           

繼續閱讀