天天看點

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

----------------

本節内容

1.RDD的工作流程

2.WordCount解說

 · shell版本WordCount

 · java版本WordCount

一、RDD工作流程

   1. RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分布式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的了解對我們學習RDD的API是非常有幫助的。本文所有示例代碼都是使用scala語言編寫的。RDD的執行過程如下:

· 從外部資料建立出輸入RDD,或者從驅動程式分發驅動程式中的對象集合

· 對RDD進行轉化,一個RDD轉化為一個新的RDD,如filter()操作

· 如果需要重用,告知RDD執行persist()操作

· 執行action觸發計算并行計算,spark先優化再執行計算,如count()和first()

  RDD的建立有2種方式

(1)從驅動程式分發驅動程式中的對象集合

從記憶體裡構造RDD,使用的方法:makeRDD和parallelize方法  

----------------------- 

val rdd01 = sc.makeRDD(List(1,2,3,4,5,6));

val r01 = rdd01.map { x => x * x }

println(r01.collect().mkString(","))

/* Array */

val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))

val r02 = rdd02.filter { x => x < 5}

println(r02.collect().mkString(","))

val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)

val r03 = rdd03.map { x => x + 1 }

println(r03.collect().mkString(","))

val rdd04 = sc.parallelize(Array(1,2,3,4,5,6) ,1)

val r04 = rdd04.filter { x => x > 3 }

println(r04.collect().mkString(","))

 ----------------------- 

2.makeRDD和parallelize的差別

  makeRDD有兩種實作方式,第一種方式parallelize聲明都一樣,接收的參數和parallelize完全一樣,def makeRDD[T:ClassTag],這種實作方式的makeRDD依賴了parallelize;makeRDD第二種實作方式defmakeRDD[T:ClassTag](T,Seq(String)))

第一種:mkRDD實作方式

val blog1=sc.parallelize(List(1,2,3));

val blog2=sc.makeRDD(List(1,2,3));

第二種:mkRDD實作方式

valseq=List((1,List("a","b","c")),(2,List("aa","bb","cc")));

val blog3=sc.makeRDD(seq);

blog3.preferredLocations(blog3.partitions(0));

blog3.preferredLocations(blog3.partitions(1));

WordCount

  WordCount是分布式程式設計的入門示例,本節也從WordCount舉例說明RDD DEMO

1.Spark shell版本

---------------------------------------------------

//加載hdfs上的檔案

val txtFile ="/tmp/test/core-site.xml" ;       

val txtData = sc.textFile(txtFile);

//将上一步生成的RDD對象儲存到緩存中,在此之後Spark就不需要在每次資料查詢時都重新計算

txtData.cache()    ;

// flatMap先映射後扁平化,

val wcData = txtData.flatMap(l =>l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _);    

//可以提取出所有rdd裡的資料項,逐行輸出

wcData.collect().foreach(println);   

備注:

A.     關于spark-shell的啟動參數指定

bin/spark-shell --executor-memory 1G --total-executor-cores10 --executor-cores 1 --master yarn-client  --driver-class-path /usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar

--executor-memory: 指定每個executor(執行器)占用的記憶體 

--total-executor-cores: 所有executor總共使用的cpu核數 

--executor-cores:每個executor使用的cpu核數 

--driver-class-path:指定要加載的jar包

--master:

local[8]:表示在本地運作,資料會下載下傳到接口機本地來執行,單機版

spark://master01:7077:表示在叢集上運作應用程式,指定任務送出的叢集路徑在哪裡。這就需要提前啟動一個真實的Standalone叢集。可以指定多個master的位址,用逗号隔開。

yarn-client:在客戶模式上,driver與送出程式的用戶端在一個程序

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

yarn-cluster:在叢集模式上,driver是從叢集中的一個worker程序中啟動的,這個程序隻要完成了送出作業任務就會退出,不會等待送出的應用程式的完成。Spark-shell時,必須使用yarn-client模式,因為你要在client上寫SQL。

B.spark-shell 是一個spark application,運作時需要向資料總管申請資源,如standalone spark、YARN、Mesos。本例向standalone spark申請資源,是以在運作spark-shell時需要指向申請資源的standalonespark叢集資訊,其參數為MASTER。

如果未在spark-env.sh中申明MASTER,則使用指令MASTER=spark://cdh1:7077bin/spark-shell啟動;如果已經在spark-env.sh中申明MASTER,則可以直接用bin/spark-shell啟動。

由于spark-shell預設的情況下,會申請所有的CPU資源

B.     Spark每次Executor執行任務情況

【spark 深入學習 05】RDD程式設計之旅基礎篇-01
【spark 深入學習 05】RDD程式設計之旅基礎篇-01
【spark 深入學習 05】RDD程式設計之旅基礎篇-01

2. java 版本

搭建Spark開發環境

(1)前提:配置好jdk和scala到windows

(2)安裝Intellij去官網下載下傳Intellij:https://www.jetbrains.com/idea/,在windows環境下輕按兩下安裝即可

(3)安裝scala插件

步驟如下圖所示,安裝好scala插件後,點選restart重新開機intellij

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

(4)、使用Intellij寫WordCount代碼

a.建立scala工程

  File -> new  -> project  -> scala project –>scala,項目名稱:spark02

在src目錄下,建立cn.com包,在該包下建立object 類,命名為word,完成word.scala代碼如下所示:

---------------------------------------------------------------------package cn.com

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

/**

  * Created by Administrator on 2016/11/2.

  */object word {

  def main(args: Array[String]) {

    if(args.length < 1) {

      System.err.println("Usage: <file>")

      System.exit(1)

    }

    val conf = new SparkConf()

    val sc = new SparkContext(conf)

    //SparkContext 是把代碼送出到叢集或者本地的通道,我們編寫Spark代碼,無論是要本地運作還是叢集運作都必須有SparkContext的執行個體

    val line = sc.textFile(args(0))

    //把讀取的内容儲存給line變量,其實line是一個MappedRDD,Spark的所有操作都是基于RDD的

    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect.foreach(println)

    sc.stop

  }

}

---------------------------------------------------------------------

b.導入spark包

 File

->Project structure

->project settting

->libraries->+

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

導入spark-assembly-1.6.0-hadoop2.6.0.jar包(該包從spark安裝包的lib下獲得)

c.選擇Artifacts

File

->Artifacts->+,選擇要導入的項目,以及main類

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

并且指定jar包輸出的位置

【spark 深入學習 05】RDD程式設計之旅基礎篇-01
【spark 深入學習 05】RDD程式設計之旅基礎篇-01

d.輸出jar包

Build -> Build ArtiFacts ->build,打好jar包到:D:\spark02\out\artifacts\spark02_jar\spark02.jar

e.上傳jar包到spark用戶端,并執行

  執行指令:

spark-submit --master yarn --executor-memory 1000M /usr/local/tdr_hadoop/spark/spark02.jarhdfs://tdrHadoop/tmp/test/core-site.xml

在yarn的前台顯示正在執行

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

執行結果輸出:

【spark 深入學習 05】RDD程式設計之旅基礎篇-01

繼續閱讀