天天看點

【spark系列3】spark開發簡單指南分布式資料集建立之textFile分布式資料集操作之轉換和動作資料集操作之map和reduce轉換是惰性的重要轉換操作之caching(緩存)眼下支援的轉換(transformation)眼下支援的動作(actions)兩種共享變量之廣播變量和累加器spark的例子程式參考資料

文本檔案的RDDs能夠通過SparkContext的textFile方法建立,該方法接受檔案的URI位址(或者機器上的檔案本地路徑,或者一個hdfs://,

sdn://,kfs://,其他URI).這裡是一個調用樣例:

scala> val distFile =

sc.textFile(“data.txt”)

distFile: spark.RDD[String] =

spark.HadoopRDD@1d4cee08

分布式資料集支援兩種操作:

轉換(transformations):依據現有的資料集建立一個新的資料集

動作(actions):在資料集上執行計算後,傳回一個值給驅動程式

 一旦被建立,distFile能夠進行資料集操作。比如,我們能夠使用例如以下的map和reduce操作将全部行數的長度相加:

distFile.map(_.size).reduce(_ + _ )

方法也接受可選的第二參數,來控制檔案的分片數目。預設來說,Spark為每一塊檔案建立一個分片(HDFS預設的塊大小為64MB),可是你能夠通過傳入一個更大的值來指定很多其它的分片。注意,你不能指定一個比塊個數更少的片值(和hadoop中,Map數不能小于Block數一樣)

Map是一個轉換,将資料集的每個元素,都經過一個函數進行計算後,傳回一個新的分布式資料集作為結果。

Reduce是一個動作,将資料集的全部元素,用某個函數進行聚合,然後将終于結果傳回驅動程式,而并行的reduceByKey還是傳回一個分布式資料集

全部Spark中的轉換都是惰性的,也就是說,并不會立即發生計算。相反的,它僅僅是記住應用到基礎資料集上的這些轉換(Transformation)。

 而這些轉換(Transformation),僅僅會在有一個動作(Action)發生,要求傳回結果給驅動應用時,才真正進行計算。這個設計讓Spark更加有效率的執行。比如,我們能夠實作,通過map建立一個資料集,然後再用reduce,而僅僅傳回reduce的結果給driver,而不是整個大的資料集。

spark提供的一個重要轉換操作是Caching。當你cache一個分布式資料集時,每一個節點會存儲該資料集的全部片,并在記憶體中計算,并在其他操作中重用。這将會使得興許的計算更加的高速(一般是10倍),緩存是spark中一個構造疊代算法的關鍵工具,也能夠在解釋器中互動使用。

調用RDD的cache()方法,能夠讓它在第一次計算後,将結果保持存儲在記憶體。資料集的不同部分,将會被存儲在計算它的不同的叢集節點上,讓興許的資料集使用更快。緩存是有容錯功能的,假設任一分區的RDD資料丢失了,它會被使用原來建立它的轉換,再計算一次(不須要所有又一次計算,僅僅計算丢失的分區)。

Transformation

Meaning

map(func)

傳回一個新的分布式資料集,由每一個原元素經過func函數轉換後組成

filter(func)

傳回一個新的資料集,由經過func函數後傳回值為true的原元素組成

flatMap(func)

類似于map,可是每個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個Seq,而不是單一進制素)

sample(withReplacement, frac, seed)

依據給定的随機種子seed,随機抽樣出數量為frac的資料

union(otherDataset)

傳回一個新的資料集,由原資料集和參數聯合而成

groupByKey([numTasks])

在一個由(K,V)對組成的資料集上調用,傳回一個(K,Seq[V])對的資料集。注意:預設情況下,使用8個并行任務進行分組,你能夠傳入numTask可選參數,依據資料量設定不同數目的Task

(groupByKey和filter結合,能夠實作類似Hadoop中的Reduce功能)

reduceByKey(func, [numTasks])

在一個(K,V)對的資料集上使用,傳回一個(K,V)對的資料集,key同樣的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是能夠通過第二個可選參數來配置的。

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)類型的資料集上調用,傳回一個(K,(V,W))對,每一個key中的全部元素都在一起的資料集

groupWith(otherDataset, [numTasks])

在類型為(K,V)和(K,W)類型的資料集上調用,傳回一個資料集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其他架構,稱為CoGroup

cartesian(otherDataset)

笛卡爾積。但在資料集T和U上調用時,傳回一個(T,U)對的資料集,全部元素互動進行笛卡爾積。

sortByKey([ascendingOrder])

在類型為( K, V )的資料集上調用,傳回以K為鍵進行排序的(K,V)對資料集。升序或者降序由boolean型的ascendingOrder參數決定

(類似于Hadoop的Map-Reduce中間階段的Sort,按Key進行排序)

Action

reduce(func)

通過函數func聚集資料集中的全部元素。Func函數接受2個參數,傳回一個值。這個函數必須是關聯性的,確定能夠被正确的并發運作

collect()

在Driver的程式中,以數組的形式,傳回資料集的全部元素。這一般會在使用filter或者其他操作後,傳回一個足夠小的資料子集再使用,直接将整個RDD集Collect傳回,非常可能會讓Driver程式OOM

count()

傳回資料集的元素個數

take(n)

傳回一個數組,由資料集的前n個元素組成。注意,這個操作眼下并不是在多個節點上,并行運作,而是Driver程式所在機器,單機計算全部的元素

(Gateway的記憶體壓力會增大,須要慎重使用)

first()

傳回資料集的第一個元素(類似于take(1))

saveAsTextFile(path)

将資料集的元素,以textfile的形式,儲存到本地檔案系統,hdfs或者不論什麼其他hadoop支援的檔案系統。Spark将會調用每一個元素的toString方法,并将它轉換為檔案裡的一行文本

saveAsSequenceFile(path)

将資料集的元素,以sequencefile的格式,儲存到指定的檔案夾下,本地系統,hdfs或者不論什麼其他hadoop支援的檔案系統。RDD的元素必須由key-value對組成,并都實作了Hadoop的Writable接口,或隐式能夠轉換為Writable(Spark包含了基本類型的轉換,比如Int,Double,String等等)

foreach(func)

在資料集的每個元素上,執行函數func。這通經常使用于更新一個累加器變量,或者和外部存儲系統做互動

一般來說,當一個函數被傳遞給Spark操作(比如map和reduce),一般是在叢集結點上執行,在函數中使用到的全部變量,都做分别拷貝,供函數操作,而不會互相影響。這些變量會被複制到每一台機器,而在遠端機器上,在對變量的全部更新,都不會被傳播回Driver程式。然而,Spark提供兩種有限的共享變量,供兩種公用的使用模式:廣播變量和累加器。

廣播變量同意程式猿保留一個僅僅讀的變量,緩存在每一台機器上,而非每一個任務儲存一份拷貝。他們能夠使用,比如,給每一個結點一個大的輸入資料集,以一種高效的方式。Spark也會嘗試,使用一種高效的廣播算法,來降低溝通的損耗。

廣播變量是從變量V建立的,通過調用SparkContext.broadcast(v)方法。這個廣播變量是一個v的分裝器,它的僅僅能夠通過調用value方法獲得。例如以下的解釋器子產品展示了怎樣應用:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: spark.Broadcast[Array[Int]] =

spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala>

broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

在廣播變量被建立後,它能在叢集執行的不論什麼函數上,被代替v值進行調用,進而v值不須要被再次傳遞到這些結點上。另外,對象v不能在被廣播後改動,是僅僅讀的,進而保證全部結點的變量,收到的都是一模一樣的。

  累加器是僅僅能通過組合操作“加”起來的變量,能夠高效的被并行支援。他們能夠用來實作計數器(如同MapReduce中)和求和。Spark原生就支援Int和Double類型的計數器,程式猿能夠加入新的類型。

  一個計數器,能夠通過調用SparkContext.accumulator(V)方法來建立。執行在叢集上的任務,能夠使用+=來加值。然而,它們不能讀取計數器的值。當Driver程式須要讀取值的時候,它能夠使用.value方法。

        例如以下的解釋器,展示了怎樣利用累加器,将一個數組裡面的全部元素相加

scala> val accum = sc.accumulator(0)

accum: spark.Accumulator[Int] =

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum +=

x)

10/09/29 18:41:08 INFO SparkContext: Tasks finished in

0.317106 s

scala> accum.value

res2: Int = 10

        在Spark的站點上,你能夠看到。

  另外,Spark包含了一些例子,在examples/src/main/scala上,有些既有Spark版本号,又有本地非并行版本号,同意你看到假設要讓程式以叢集化的方式跑起來的話,須要做什麼改變。你能夠執行它們,通過将類名傳遞給spark中的run腳本

— 比如./run spark.examples.SparkPi. 每個例子程式,都會列印使用幫助,當執行時沒不論什麼參數時。

1.spark随談——開發指南(譯)

/*

注:

本文全部内容來自參考資料1。

轉載請注明來源:

*/