天天看點

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

Spark 為包含鍵值對類型的RDD 提供了一些專有的操作。這些RDD 被稱為pair RDD。Pair RDD 是很多程式的構成要素,因為它們提供了并行操作各個鍵或跨節點重新進行資料分組的操作接口。我們通常從一個RDD 中提取某些字段(例如代表事件時間、使用者ID 或者其他辨別符的字段),并使用這些字段作為pair RDD 操作中的鍵。

1 Pair RDD的轉化操作

Pair RDD 可以使用所有标準RDD 上的可用的轉化操作。由于pair RDD 中包含二進制組,是以需要傳遞的函數應當操作二進制組而不是獨立的元素。

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

filter

Pair RDD 也還是RDD(元素為Java 或Scala 中的Tuple2 對象或Python 中的元組),是以同樣支援RDD 所支援的函數。例如,我們可以對pair RDD篩選掉長度超過20個字元的行:

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

mapValues

有時,我們隻想通路pair RDD的值部分,這時操作二進制組很麻煩。由于這是一種常見的使用模式,是以Spark 提供了mapValues(func) 函數,功能類似于map{case (x, y): (x,func(y))}。可以在很多例子中使用這個函數。接下來就依次讨論pair RDD 的各種操作,先從聚合操作開始。

1.1 聚合操作

當資料集以鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統計是很常見的操作。之前講解過基礎RDD上的fold()、combine()、reduce() 等行動操作,pair RDD 上則有相應的針對鍵的轉化操作。Spark 有一組類似的操作,可以組合具有相同鍵的值。這些操作傳回RDD,是以它們是轉化操作而不是行動操作。

reduceByKey

reduceByKey() 與reduce() 相當類似;它們都接收一個函數,并使用該函數對值進行合并。reduceByKey() 會為資料集中的每個鍵進行并行的歸約操作,每個歸約操作會将鍵相同的值合并起來。因為資料集中可能有大量的鍵,是以reduceByKey() 沒有被實作為向使用者程式傳回一個值的行動操作。實際上,它會傳回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

foldByKey

foldByKey() 則與fold() 相當類似;它們都使用一個與RDD 和合并函數中的資料類型相

同的零值作為初始值。與fold() 一樣,foldByKey() 操作所使用的合并函數對零值與另一個元素進行合并,結果仍為該元素。

對于解決經典的分布式單詞計數問題,可以使用flatMap() 來生成以單詞為鍵、以數字1 為值的pair RDD,然後使用reduceByKey() 對所有的單詞進行計數。

val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
           

事實上,我們可以對第一個RDD 使用countByValue() 函數,以更快地實作

單詞計數:input.flatMap(x => x.split(" ")).countByValue()。countByValue統計一個RDD中各個value(這裡即各個單詞)的出現次數。傳回一個map,map的key是元素的值,value是出現的次數。

combineByKey

combineByKey() 是最為常用的基于鍵進行聚合的函數。大多數基于鍵聚合的函數都是用它實作的。和aggregate() 一樣,combineByKey() 可以讓使用者傳回與輸入資料的類型不同的傳回值。

要了解combineByKey(), 要先了解它在處理資料時是如何處理每個元素的。由于

combineByKey() 會周遊分區中的所有元素,是以每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。

如果這是一個新的元素,combineByKey() 會使用一個叫作createCombiner() 的函數來建立那個鍵對應的累加器的初始值(該鍵對應的value,初始計數1)。需要注意的是,這一過程會在每個分區中第一次出現各個鍵時發生,而不是在整個RDD 中第一次出現一個鍵時發生。

如果這是一個在處理目前分區之前已經遇到的鍵,它會使用mergeValue() 方法将該鍵的累加器對應的目前值與這個新的值進行合并(新遇到的已存在key對應的value + 累加器的value, 累加器的計數+1)。

由于每個分區都是獨立處理的,是以對于同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器,就需要使用使用者提供的mergeCombiners() 方法将各個分區的結果進行合并。

combineByKey() 有多個參數分别對應聚合操作的各個階段,因而非常适合用來解釋聚合操作各個階段的功能劃分。為了更好地示範combineByKey() 是如何工作的,下面來看看如何計算各鍵對應的平均值。

// 在Scala 中使用combineByKey() 求每個鍵對應的平均值
val result = input.combineByKey(
(v) => (v, 1), // 分區内遇到一個新的key,将其對應的value(v)和1作為累加器的初始值
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),// 分區内遇到一個之前已遇到的key時,将其對應的value(v)合并到該key對應的累加器acc中,acc._1 + v表示合并value,acc._2 + 1表示累加個數
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //不同分區中存在同一個key對應的多個累加器acc,合并acc
).map{ case (key, acc) => (key, acc._1 / acc._2.toFloat) // combineByKey傳回(key,(key_sumvalues, key_counts)),對每個元組求key對應的平均值}
result.collectAsMap().map(println(_))
           
Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

每個RDD都有固定數目的分區,分區數決定了在RDD上執行操作時的并行度。在執行聚合或分組操作時,可以要求Spark 使用給定的分區數。Spark 始終嘗試根據叢集

的大小推斷出一個有意義的預設值,但是有時候你可能要對并行度進行調優來擷取更好的性能表現。

// 在Scala 中自定義reduceByKey()的并行度
val data = Seq(("a", 3), ("b", 4), ("a", 1))
sc.parallelize(data).reduceByKey((x, y) => x + y) // 預設并行度
sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // 自定義并行度
           

有時,我們希望在除分組操作和聚合操作之外的操作中也能改變RDD的分區。對于

這樣的情況,Spark 提供了repartition() 函數。它會把資料通過網絡進行混洗,并創

建出新的分區集合。切記,對資料進行重新分區是代價相對比較大的操作。Spark中也有一個優化版的repartition(),叫作coalesce()。你可以使用Java或Scala中的rdd.

partitions.size()以及Python中的rdd.getNumPartitions檢視RDD的分區數,并確定調

用coalesce()時将RDD 合并到比現在的分區數更少的分區中。

1.2 資料分組

groupByKey

對于有鍵的資料,一個常見的用例是将資料根據鍵進行分組——比如檢視一個顧客的所有訂單。如果資料已經以預期的方式提取了鍵,groupByKey() 就會使用RDD中的鍵來對資料進行分組。對于一個由類型K的鍵和類型V的值組成的RDD,所得到的結果RDD類型會是[K, Iterable[V]]。

groupBy

groupBy()可以用于未成對的資料上,也可以根據除鍵相同以外的條件進行分組。它可以接收一個函數,對源RDD中的每個元素使用該函數,将傳回結果作為鍵再進行分組。

val a = sc.parallelize(1 to 9, 3) 
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect//分成兩組 
/*結果 
Array(
(even,ArrayBuffer(2, 4, 6, 8)),
(odd,ArrayBuffer(1, 3, 5, 7, 9))
)
*/
           
val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2//分成兩組
}
a.groupBy(myfunc).collect
/*結果
Array(
(0,ArrayBuffer(2, 4, 6, 8)),
(1,ArrayBuffer(1, 3, 5, 7, 9))
)
*/
           

cogroup

除了對單個RDD的資料進行分組,還可以使用一個叫作cogroup()的函數對多個共享同

一個鍵的RDD進行分組。對兩個鍵的類型均為K 而值的類型分别為V和W的RDD進行

cogroup()時,得到的結果RDD類型為[(K, (Iterable[V], Iterable[W]))]。如果其中的一個RDD對于另一個RDD中存在的某個鍵沒有對應的記錄,那麼對應的疊代器則為空。

val DBName=Array(
  Tuple2(1,"Spark"),
  Tuple2(2,"Hadoop"),
  Tuple2(3,"Kylin"),
  Tuple2(4,"Flink")
)
val numType=Array(
  Tuple2(1,"String"),
  Tuple2(2,"int"),
  Tuple2(3,"byte"),
  Tuple2(4,"bollean"),
  Tuple2(5,"float"),
  Tuple2(1,"34"),
  Tuple2(1,"45"),
  Tuple2(2,"47"),
  Tuple2(3,"75"),
  Tuple2(4,"95"),
  Tuple2(5,"16"),
  Tuple2(1,"85")
)
val names=sc.parallelize(DBName)
val types=sc.parallelize(numType)
val nameAndType=names.cogroup(types)  
nameAndType.collect.foreach(println)
/*結果
(4,(CompactBuffer(Flink),CompactBuffer(bollean, 95)))
(1,(CompactBuffer(Spark),CompactBuffer(String, 34, 45, 85)))
(3,(CompactBuffer(Kylin),CompactBuffer(byte, 75)))
(5,(CompactBuffer(),CompactBuffer(float, 16)))
(2,(CompactBuffer(Hadoop),CompactBuffer(int, 47)))
*/
           

1.3 連接配接

将有鍵的資料與另一組有鍵的資料一起使用是對鍵值對資料執行的最有用的操作之一。連接配接資料可能是pair RDD 最常用的操作之一。連接配接方式多種多樣:右外連接配接、左外連接配接、交叉連接配接以及内連接配接。

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序
Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

join

普通的join 操作符表示内連接配接2。隻有在兩個pair RDD 中都存在的鍵才叫輸出。當一個輸入對應的某個鍵有多個值時,生成的pair RDD 會包括來自兩個輸入RDD 的每一組相對應的記錄。

storeAddress = {
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}

storeRating = {
(Store("Ritual"), 4.9), (Store("Philz"), 4.8))}

storeAddress.join(storeRating) == {
(Store("Ritual"), ("1026 Valencia St", 4.9)),
(Store("Philz"), ("748 Van Ness Ave", 4.8)),
(Store("Philz"), ("3101 24th St", 4.8))}
           

leftOuterJoin / rightOuterJoin

有時,我們不希望結果中的鍵必須在兩個RDD 中都存在。例如,在連接配接客戶資訊與推薦時,如果一些客戶還沒有收到推薦,我們仍然不希望丢掉這些顧客。

leftOuterJoin(other)和rightOuterJoin(other) 都會根據鍵連接配接兩個RDD,但是允許結果中存在其中的一個pair RDD 所缺失的鍵。

在使用leftOuterJoin() 産生的pair RDD 中,源RDD 的每一個鍵都有對應的記錄。每個

鍵相應的值是由一個源RDD 中的值與一個包含第二個RDD 的值的Option(在Java 中為

Optional)對象組成的二進制組。在Python 中,如果一個值不存在,則使用None 來表示;而資料存在時就用正常的值來表示,不使用任何封裝。和join() 一樣,每個鍵可以得到多條記錄;當這種情況發生時,我們會得到兩個RDD 中對應同一個鍵的兩組值的笛卡爾積。

rightOuterJoin() 幾乎與leftOuterJoin() 完全一樣,隻不過預期結果中的鍵必須出現在

第二個RDD 中,而二進制組中的可缺失的部分則來自于源RDD 而非第二個RDD。

storeAddress = {
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}

storeRating = {
(Store("Ritual"), 4.9), (Store("Philz"), 4.8))}

storeAddress.leftOuterJoin(storeRating) ==
{(Store("Ritual"),("1026 Valencia St",Some(4.9))),
(Store("Starbucks"),("Seattle",None)),
(Store("Philz"),("748 Van Ness Ave",Some(4.8))),
(Store("Philz"),("3101 24th St",Some(4.8)))}

storeAddress.rightOuterJoin(storeRating) ==
{(Store("Ritual"),(Some("1026 Valencia St"),4.9)),
(Store("Philz"),(Some("748 Van Ness Ave"),4.8)),
(Store("Philz"), (Some("3101 24th St"),4.8))}
           

1.4 資料排序

sortByKey

我們經常要将RDD 倒序排列,是以sortByKey() 函數接收一個叫作ascending 的參數,表示我們是否想要讓結果按升序排序(預設值為true)。有時我們也可能想按完全不同的排序依據進行排序。要支援這種情況,我們可以提供自定義的比較函數。下面會将整數轉為字元串,然後使用字元串比較函數來對RDD 進行排序。

scala> val b = sc.parallelize(List(3,1,9,12,4))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:12
 
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[39] at zip at <console>:16
 
scala> c.sortByKey().collect
res15: Array[(Int, String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))
 
scala> implicit val sortIntegersByString = new Ordering[Int]{
     | override def compare(a: Int, b: Int) =
     | a.toString.compare(b.toString)}
sortIntegersByString: Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@5d533f7a
 
scala>  c.sortByKey().collect
res17: Array[(Int, String)] = Array((1,iteblog), (12,397090770), (3,wyp), (4,test), (9,com))
 
scala> c.sortByKey().collect
res11: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))
           

例子中的sortIntegersByString就是修改了預設的排序規則。這樣将預設按照Int大小排序改成了對字元串的排序,是以12會排序在3之前。

1.4 Pair RDD的行動操作

和轉化操作一樣,所有基礎RDD 支援的傳統行動操作也都在pair RDD 上可用。Pair RDD提供了一些額外的行動操作,可以讓我們充分利用資料的鍵值對特性。

Spark pairRDD(鍵值對)操作:聚合、分組、連接配接、排序

參考 《Spark快速大資料分析》

繼續閱讀