天天看點

大資料系列之并行計算引擎Spark介紹

大資料系列之并行計算引擎Spark介紹

spark:

apache spark 是專為大規模資料處理而設計的快速通用的計算引擎。

spark是uc berkeley amp lab (加州大學伯克利分校的amp實驗室)所開源的類hadoop mapreduce的通用并行架構,spark擁有hadoop mapreduce所具有的優點;但不同于mapreduce的是job中間輸出結果可以儲存在記憶體中,進而不再需要讀寫hdfs,是以spark能更好地适用于資料挖掘與機器學習等需要疊代的mapreduce的算法。

spark 是一種與 hadoop 相似的開源叢集計算環境,但是兩者之間還存在一些不同之處,這些有用的不同之處使 spark 在某些工作負載方面表現得更加優越,換句話說,spark 啟用了記憶體分布資料集,除了能夠提供互動式查詢外,它還可以優化疊代工作負載。

spark 是在 scala 語言中實作的,它将 scala 用作其應用程式架構。與 hadoop 不同,spark 和 scala 能夠緊密內建,其中的 scala 可以像操作本地集合對象一樣輕松地操作分布式資料集。

盡管建立 spark 是為了支援分布式資料集上的疊代作業,但是實際上它是對 hadoop 的補充,可以在 hadoop 檔案系統中并行運作。通過名為 mesos 的第三方叢集架構可以支援此行為。spark 由加州大學伯克利分校 amp 實驗室 (algorithms, machines, and people lab) 開發,可用來建構大型的、低延遲的資料分析應用程式。

spark的性能特點:

1.更快的速度:記憶體計算下,spark 比 hadoop 快100倍。

記憶體計算引擎,提供cache機制來支援需要反複疊代計算或者多次資料共享,減少資料讀取的i/o開銷

dag引擎,減少多次計算之間中間結果寫到hdfs的開銷;

使用多線程池模型來減少task啟動開銷,shuffle過程中避免不必要的sort操作已經減少磁盤i/o操作;

2.易用性:

spark 提供了80多個進階運算符。

提供了豐富的api,支援java,scala,python和r四種語言;

代碼量比mapreduce少2~5倍;

3.通用性:spark 提供了大量的庫,包括sql、dataframes、mllib、graphx、spark streaming。 開發者可以在同一個應用程式中無縫組合使用這些庫。

4.支援多種資料總管:spark 支援 hadoop yarn,apache mesos,及其自帶的獨立叢集管理器

spark基本原理:

spark streaming:建構在spark上處理stream資料的架構,基本的原理是将stream資料分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分資料。spark streaming建構在spark上,一方面是因為spark的低延遲執行引擎(100ms+),雖然比不上專門的流式資料處理軟體,也可以用于實時計算,另一方面相比基于record的其它處理架構(如storm),一部分窄依賴的rdd資料集可以從源資料重新計算達到容錯處理目的。此外小批量處理的方式使得它可以同時相容批量和實時資料處理的邏輯和算法。友善了一些需要曆史資料和實時資料聯合分析的特定應用場合。

spark背景:

1.mapreduce局限性:

大資料系列之并行計算引擎Spark介紹

1.僅支援map和reduce兩種操作;

2.處理效率低效;不适合疊代計算(如機器學習、圖計算等),互動式處理(資料挖掘)和流失處理(日志分析)

3.map中間結果需要寫磁盤,reduce寫hdfs,多個mr之間通過hdfs交換資料;

4.任務排程和啟動開銷大;

5.無法充分利用記憶體;(與mr産生時代有關,mr出現時記憶體價格比較高,采用磁盤存儲代價小)

6.map端和reduce端均需要排序;

7.mapreduce程式設計不夠靈活。(比較scala函數式程式設計而言)

8.架構多樣化[采用一種架構技術(spark)同時實作批處理、流式計算、互動式計算]:

批處理:mapreduce、hive、pig;

流式計算:storm

互動式計算:impala

spark核心概念:

rdd:resilient distributed datasets,彈性分布式資料集

大資料系列之并行計算引擎Spark介紹

分布在叢集中的隻讀對象集合(由多個partition 構成);

可以存儲在磁盤或記憶體中(多種存儲級别);

通過并行“轉換”操作構造;

失效後自動重構;

rdd基本操作(operator)

大資料系列之并行計算引擎Spark介紹

transformation具體内容

map(func) :傳回一個新的分布式資料集,由每個原元素經過func函數轉換後組成

filter(func) : 傳回一個新的資料集,由經過func函數後傳回值為true的原元素組成

*flatmap(func) : 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個seq,而不是單一進制素)

flatmap(func) : 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個seq,而不是單一進制素)

sample(withreplacement, frac, seed) :

根據給定的随機種子seed,随機抽樣出數量為frac的資料。

union(otherdataset) : 傳回一個新的資料集,由原資料集和參數聯合而成

groupbykey([numtasks]) :

在一個由(k,v)對組成的資料集上調用,傳回一個(k,seq[v])對的資料集。注意:預設情況下,使用8個并行任務進行分組,你可以傳入numtask可選參數,根據資料量設定不同數目的task

reducebykey(func, [numtasks]) : 在一個(k,v)對的資料集上使用,傳回一個(k,v)對的資料集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。

join(otherdataset, [numtasks]) :

在類型為(k,v)和(k,w)類型的資料集上調用,傳回一個(k,(v,w))對,每個key中的所有元素都在一起的資料集

groupwith(otherdataset, [numtasks]) : 在類型為(k,v)和(k,w)類型的資料集上調用,傳回一個資料集,組成元素為(k, seq[v], seq[w]) tuples。這個操作在其它架構,稱為cogroup

cartesian(otherdataset) : 笛卡爾積。但在資料集t和u上調用時,傳回一個(t,u)對的資料集,所有元素互動進行笛卡爾積。

flatmap(func) :

類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個seq,而不是單一進制素)

actions具體内容

reduce(func) : 通過函數func聚集資料集中的所有元素。func函數接受2個參數,傳回一個值。這個函數必須是關聯性的,確定可以被正确的并發執行

collect() : 在driver的程式中,以數組的形式,傳回資料集的所有元素。這通常會在使用filter或者其它操作後,傳回一個足夠小的資料子集再使用,直接将整個rdd集collect傳回,很可能會讓driver程式oom

count() : 傳回資料集的元素個數

take(n) : 傳回一個數組,由資料集的前n個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是driver程式所在機器,單機計算所有的元素(gateway的記憶體壓力會增大,需要謹慎使用)

first() : 傳回資料集的第一個元素(類似于take(1))

saveastextfile(path) : 将資料集的元素,以textfile的形式,儲存到本地檔案系統,hdfs或者任何其它hadoop支援的檔案系統。spark将會調用每個元素的tostring方法,并将它轉換為檔案中的一行文本

saveassequencefile(path) : 将資料集的元素,以sequencefile的格式,儲存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支援的檔案系統。rdd的元素必須由key-value對組成,并都實作了hadoop的writable接口,或隐式可以轉換為writable(spark包括了基本類型的轉換,例如int,double,string等等)

foreach(func) : 在資料集的每一個元素上,運作函數func。這通常用于更新一個累加器變量,或者和外部存儲系統做互動

算子分類

大緻可以分為三大類算子:

value資料類型的transformation算子,這種變換并不觸發送出作業,針對處理的資料項是value型的資料。

key-value資料類型的transfromation算子,這種變換并不觸發送出作業,針對處理的資料項是key-value型的資料對。

action算子,這類算子會觸發sparkcontext送出job作業。

大資料系列之并行計算引擎Spark介紹

spark rdd cache/persist

spark rdd cache

1.允許将rdd緩存到記憶體中或磁盤上,以便于重用

2.提供了多種緩存級别,以便于使用者根據實際需求進行調整

大資料系列之并行計算引擎Spark介紹

3.cache使用

大資料系列之并行計算引擎Spark介紹

之前用mapreduce實作過wordcount,現在我們用scala實作下wordcount.是不是很簡潔呢?!

import org.apache.spark.{sparkconf, sparkcontext} 

object sparkwordcount{ 

 def main(args: array[string]) { 

 if (args.length == 0) { 

 system.err.println("usage: sparkwordcount <inputfile> <outputfile>") 

 system.exit(1) 

 } 

 val conf = new sparkconf().setappname("sparkwordcount") 

 val sc = new sparkcontext(conf) 

 val file=sc.textfile("file:///hadooplearning/spark-1.5.1-bin-hadoop2.4/readme.md") 

 val counts=file.flatmap(line=>line.split(" ")) 

 .map(word=>(word,1)) 

 .reducebykey(_+_) 

 counts.saveastextfile("file:///hadooplearning/spark-1.5.1-bin-hadoop2.4/countreslut.txt") 

關于rdd的transformation與action的特點我們介紹下;

1.接口定義方式不同:

transformation: rdd[x]–>rdd[y]

action:rdd[x]–>z (z不是一個rdd,可能是一個基本類型,數組等)

2.惰性執行:

transformation:隻會記錄rdd轉化關系,并不會觸發計算

action:是觸發程式執行(分布式)的算子。

大資料系列之并行計算引擎Spark介紹

程式的執行流程:

大資料系列之并行計算引擎Spark介紹

spark運作模式:

local(本地模式):

1.單機運作,通常用于測試;

local:隻啟動一個executor

local[k]:啟動k個executor

local[*]:啟動跟cpu數目相同的executor

2.standalone(獨立模式)

獨立運作在一個叢集中

大資料系列之并行計算引擎Spark介紹

3.yarn/mesos

1.運作在資源管理系統上,比如yarn或mesos

2.spark on yarn存在兩種模式

yarn-client

大資料系列之并行計算引擎Spark介紹

yanr-cluster

大資料系列之并行計算引擎Spark介紹

兩種方式的差別:

大資料系列之并行計算引擎Spark介紹

spark在企業中的應用場景

基于日志資料的快速查詢系統業務;

建構于spark之上的sparksql ,利用其快速以及記憶體表等優勢,承擔了日志資料的即席查詢工作。

典型算法的spark實作

預測使用者的廣告點選機率;

計算兩個好友間的共同好友數;

用于etl的sparksql和dag任務。

本文作者:佚名

來源:51cto

繼續閱讀