---------------------
本節内容:
· 鍵值對RDD出現背景
· 鍵值對RDD轉化操作執行個體
· 鍵值對RDD行動操作執行個體
· 鍵值對RDD資料分區
· 參考資料
雖然大部分Spark的RDD操作都支援所有種類的對象,但是有少部分特殊的操作隻能作用于鍵值對類型的RDD。這類操作中最常見的就是分布的shuffle操作,比如将元素通過鍵來分組或聚集計算.是以,鍵值對RDD也是很多程式不可缺失的一部分.

一、鍵值對RDD出現背景
mapreduce架構是把資料轉化為Key-value,再聚合為key-values的過程,在spark裡key-value rdd(pair rdd)同樣是最常用的,在每個應用中基本會用到,pair rdd裡面的元素是Tuple2,pair rdd的transform函數很多.pari rdd是很多程式的構成要素,因為他們提供了并行性操作各個鍵或跨節點重新進行分組的操作接口.pair rdd最簡單的2種建立方法:
(1)通過map建立執行個體所示:
val line =sc.textFile("/tmp/test/core-site.xml");
val pairs=line.map(x=>(x.split(" ")(0),x));
pairs.foreach(println);
------
說明:
map:讀取将讀取的每一行用空格的第一行為key,整行内容為value
foreach:循環周遊列印出每個pair
(2)直接讀取鍵值對類型的資料
val pairrdd = sc.parallelize(List((1,2),(3,4),(3,6)));
pairrdd.foreach(println);
parallelize:從外部資料集讀取鍵值對資料
二、鍵值對RDD轉化操作執行個體
1.例子:reduceByKey/groupByKey/
val result=pairrdd.reduceByKey((x,y)=>x+y);
result.foreach(println);
reduceByKey:合并具有相同鍵的值
val result=pairrdd.groupByKey();
groupByKey:将同一個key的值都放到一個清單中,通過ShuffledRDD将每個partition中fetch過來,shuffle機制預設用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作後面接着mapPartition()操作,生成MapPartitionRDD.
2.例子:combineByKey/mapvalues
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0));
val d1 = sc.parallelize(initialScores);
type MVType = (Int, Double) //定義一個元組類型(科目計數器,分數);
d1.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect().foreach(println);
combineByKey:
a .score => (1, score),我們把分數作為參數,并傳回了附加的元組類型。 以"Fred"為列,目前其分數為88.0 =>(1,88.0) 1表示目前科目的計數器,此時隻有一個科目
b.(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意這裡的c1就是createCombiner初始化得到的(1,88.0)。在一個分區内,我們又碰到了"Fred"的一個新的分數91.0。當然我們要把之前的科目分數和目前的分數加起來即c1._2 + newScore,然後把科目電腦加1即c1._1 + 1
c.(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是個學霸,他選修的科目可能過多而分散在不同的分區中。所有的分區都進行mergeValue後,接下來就是對分區間進行合并了,分區間科目數和科目數相加分數和分數相加就得到了總分和總科目數
結果:
(Wilma,95.33333333333333)
(Fred,91.33333333333333)
val result=pairrdd.mapValues(x=>x+1);
mapValues:對每個鍵的值應用一個函數而不改變鍵的内容
3.例子:flatMapValues/keys/values/sotByKey
val result=pairrdd.flatMapValues(x=>(x to 5));
flatMapValues:對每個值應用一個傳回疊代器函數,然後對傳回的每個值都生成一個對應原鍵的鍵值對記錄,通常用于符号化.
結果:
(1,2)
(1,3)
(1,4)
(1,5)
(3,4)
(3,5)
val result=pairrdd.keys;//scala不要使用括号
keys:對傳回一個僅包含鍵的RDD.
val result=pairrdd.values;//scala不要使用括号
values:對傳回一個僅包含鍵的RDD.
val result=pairrdd.sortByKey(false);
sortByKey:對傳回一個根據鍵排序的RDD.預設不填為true,ascending升序方式
4.例子:subtractByKey/join/leftOuterJoin/rightOuterJoin/coGroup
val rdd= sc.parallelize(List((1,2),(3,4),(3,6)));
val other= sc.parallelize(List((3,6)));
val result=rdd.subtract(other);
說明:删除rdd RDD 鍵與other RDD中的鍵相同的元素.
subtract:對傳回一個根據鍵排序的RDD.
val other= sc.parallelize(List((3,9)));
val result=rdd.join(other);
join:對2個RDD進行内連接配接,key相同的進行操作
(3,(4,9))
(3,(6,9))
val result=rdd.leftOuterJoin(other);
leftOuterJoin:左外連接配接,對2個rdd進行連接配接操作,確定左邊(rdd RDD)的鍵一定存在
(1,(2,none))
(3,(4,some(9)))
(3,(6,some(9)))
val result=rdd.rightOuterJoin(other);
rightOuterJoin:右外連接配接,對2個rdd進行連接配接操作,確定右邊(rdd RDD)的鍵一定存在
(3,(some(4),9))
(3,(some(6),9))
val result=rdd.cogroup(other);
cogroup:将2個RDD擁有相同鍵的資料分組到一起
(1,(compactBuffer(2),compactBuffer()))
(3,(compactBuffer(4,6),compactBuffer(9)))
三、鍵值對RDD行動操作執行個體
和轉換操作一樣,所有基礎RDD支援的傳統行動操作也都在pair RDD上可用,除此之外,pair RDD還提供了一些額外的行動操作。
1.例子:countByKey/collectAsMap/lookup(key)
val result=rdd.countByKey();
countByKey:對每個鍵對應的元素分别計數
(1,1)
(3,2)
val result=rdd.collectAsMap();
collectAsMap:從結果我們可以看出,如果RDD中同一個Key中存在多個Value,那麼後面的Value将會把前面的Value覆寫,最終得到的結果就是Key唯一,而且對應一個Value,《Spark快速大資料分析》第52頁給出的結果是不對的。
(3,6)
val result=rdd.lookup(3);
lookup:傳回給定鍵對應的所有值
4
6