天天看點

RDD算子

1、常用Transformation操作:

          (1)map(func):傳回一個新的RDD,該RDD由每一個輸入的元素經過func函數轉換後組成。

          (2)filter(func):傳回一個新的RDD,該RDD由每一個輸入的元素經過func函數計算後傳回為true的輸入元素組成。

          (3)sortBy(func,[ascending], [numTasks]):傳回一個新的RDD,輸入元素經過func函數計算後,按照指定的方式進行排序。(預設方式為false,升序;true是降序)

val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
           

          (4)flatMap(func):類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(是以func應該傳回一個序列,而不是單一進制素)。類似于先map,然後再flatten。

val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd4.flatMap(_.split(' ')).collect
------------------------------------------------------------------
val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
rdd5.flatMap(_.flatMap(_.split(" "))).collect
           

          (5)union:求并集,注意類型要一緻

          (6)intersection:求交集

          (7)distinct:去重

val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
--------------------------------------------
val rdd9 = rdd6.intersection(rdd7)
           

          (8)join、leftOuterJoin、rightOuterJoin

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
--------------------------------------------------------------------------
val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
---------------------------------------------------------------------------
val rdd3 = rdd1.leftOuterJoin(rdd2).collect
rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
---------------------------------------------------------------------------
val rdd3 = rdd1.rightOuterJoin(rdd2).collect
rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
           

          (9)groupByKey([numTasks]):在一個(K,V)的RDD上調用,傳回一個(K, Iterator[V])的RDD----隻針對資料是對偶元組的

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
val rdd3 = rdd1 union rdd2
val rdd4 = rdd3.groupByKey.collect
rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))
-----------------------------------------------------------------------------------
val rdd5 = rdd4.map(x=>(x._1,x._2.sum))
rdd5: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
           

                groupBy:傳入一個參數的函數,按照傳入的參數為key,傳回一個新的RDD[(K, Iterable[T])],value是所有可以相同的傳入資料組成的疊代器。

scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5)))
rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24

scala> rdd1.groupBy(_._1).collect
res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))
           

          (10)reduceByKey(func,[numTasks]):在一個(K,V)的RDD上調用,傳回一個(K,V)的RDD,使用指定的reduce函數,将相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設定。

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
val rdd3 = rdd1 union rdd2
val rdd6 = rdd3.reduceByKey(_+_).collect
rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
           

          (11)cogroup(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,傳回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2).collect
rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(2, 1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
----------------------------------------------------------------------------------------
val rdd4 = rdd3.map(x=>(x._1,x._2._1.sum+x._2._2.sum))
rdd4: Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,2), (kitty,2))
           

          (12)cartesian(otherDataset )笛卡爾積

val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2).collect
rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
           

2、常用Action操作:

          一旦觸發,就會執行一個任務

動作 含義
reduce 通過func函數聚集RDD中的所有元素,這個功能必須是可互動且可并聯的。
collect 在驅動器程式中,以數組的形勢傳回資料集的所有元素。
count 傳回RDD的元素個數。
first 傳回RDD的第一個元素(類似于take(1))。
take 傳回一個有資料集的前n個元素組成的數組。
takeSample 傳回一個數組,該數組由從資料集中随機采樣的num個元素組成,可以選擇是否用随機數替換不足的部分,seed用于指定随機數生成器種子。
takeOrdered
saveAsTextFile 将資料集的元素以textfile的形勢儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它轉換為文本檔案。
saveAsSequenceFile 将資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile
countByKey 針對(K,V)類型的RDD,傳回一個(K,Int)的 map ,表示每一個key對應的元素個數。
foreach 在資料集的每一個元素上,運作函數func進行更新。

3、RDD程式設計----進階API

     1、

          mapPartitions:針對每個分區進行操作

          mapPartitionsWithIndex:針對每個partition操作,把每個partition中的分區号和對應的值拿出來。是Transformation

// 1.首先自定義一個函數,符合mapPartitionsWithIndex參數要求的函數
val func = (index : Int,iter : Iterator[Int]) => {
  iter.toList.map(x=>"[PartID:" + index + ",val:" + x + "]").iterator
}
// 2.定義一個算子,分區數為2
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
// 3.調用方法,傳入自定義的函數
rdd1.mapPartitionsWithIndex(func).collect.foreach(println)
/* 列印結果
[PartID:0,val:1]
[PartID:0,val:2]
[PartID:0,val:3]
[PartID:0,val:4]
[PartID:1,val:5]
[PartID:1,val:6]
[PartID:1,val:7]
[PartID:1,val:8]
[PartID:1,val:9]
*/
           

     2、aggregate:聚合操作,是Action

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這裡先對連個分區分别進行相加,然後兩個的分區相加後的結果再相加得出最後的結果
scala> rdd1.aggregate(0)(_+_,_+_)
res0: Int = 45                                                                 
//先對每個分區比較求出最大值,然後每個分區求出的最大值再相加得出最後的結果
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res1: Int = 13
//這裡需要注意,初始值是每次都要參與運算的,例如下面的代碼:分區1是1,2,3,4;初始值為5,則他們比較最大值就是5,分區2是5,6,7,8,9;初始值為5,則他們比較結果最大值就是9;然後再相加,這裡初始值也要參與運算,5+(5+9)=19
scala> rdd1.aggregate(5)(math.max(_,_),_+_)
res0: Int = 19
-----------------------------------------------------------------------------------------------
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這裡需要注意,由于每個分區計算是并行計算,是以計算出的結果有先後順序,是以結果會出現兩種情況:如下
scala> rdd2.aggregate("")(_+_,_+_)
res0: String = defabc                                                                                                                    

scala> rdd2.aggregate("")(_+_,_+_)
res2: String = abcdef
//這裡的例子更能說明上面提到的初始值參與計算的問題,我們可以看到初始值=号參與了三次計算
scala> rdd2.aggregate("=")(_+_,_+_)
res0: String = ==def=abc
--------------------------------------------------------------------------------------
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
res1: String = 42                                                               

scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
res3: String = 24
-------------------------------------------------------------------------------------------
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
//這裡需要注意:第一個分區加上初始值元素為"","12","23",兩兩比較,最小的長度為1;第二個分區加上初始值元素為"","345","",兩兩比較,最小的長度為0
scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
res4: String = 10                                                               

scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
res9: String = 01                                                               
------------------------------------------------------------------------------------
//注意與上面的例子的差別,這裡定義的rdd裡的元素的順序跟上面不一樣,導緻結果不一樣
scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd5.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res1: String = 11 
           

     3、aggregateByKey:按照key值進行聚合

//定義RDD
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
//自定義方法,用于傳入mapPartitionsWithIndex
scala> val func=(index:Int,iter:Iterator[(String, Int)])=>{
     | iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
     | }
func: (Int, Iterator[(String, Int)]) => Iterator[String] = <function2>
//檢視分區情況
scala> pairRDD.mapPartitionsWithIndex(func).collect
res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
//注意:初始值為0和其他值的差別
scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect
res4: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))               

scala> pairRDD.aggregateByKey(10)(_+_,_+_).collect
res5: Array[(String, Int)] = Array((dog,22), (cat,39), (mouse,26))
//下面三個的差別:,第一個比較好了解,由于初始值為0,是以每個分區輸出不同動物中個數最多的那個,然後在累加
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

//下面兩個:由于有初始值,就需要考慮初始值參與計算,這裡第一個分區的元素為("cat",2), ("cat", 5), ("mouse", 4),初始值是10,不同動物之間兩兩比較value的大小,都需要将初始值加入比較,是以第一個分區輸出為("cat", 10), ("mouse", 10);第二個分區同第一個分區,輸出結果為(dog,12), (cat,12), (mouse,10);是以最後累加的結果為(dog,12), (cat,22), (mouse,20),注意最後的對每個分區結果計算的時候,初始值不參與計算
scala> pairRDD.aggregateByKey(10)(math.max(_,_),_+_).collect
res7: Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
//這個和上面的類似
scala> pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect
res8: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
           

     4、coalesce:傳回一個新的RDD

          重新給RDD的元素分區。

          當适當縮小分區數時,如1000->100,spark會把之前的10個分區當作一個分區,并行度變為100,不會引起資料shuffle。

          當嚴重縮小分區數時,如1000->1,運算時的并行度會變成1。為了避免并行效率低下問題,可将shuffle設為true。shuffle之前的運算和之後的運算分為不同stage,它們的并行度分别為1000,1。

          當把分區數增大時,必會存在shuffle,shuffle須設為true。        

          partitionBy:按照傳入的參數進行分區,傳入的參數為分區的執行個體對象,可以傳入之定義分區的執行個體或者預設的HashPartitioner;         

          repartition:傳回一個新的RDD

               按指定分區數重新分區RDD,存在shuffle。

               當指定的分區數比目前分區數目少時,考慮使用coalesce,這樣能夠避免shuffle。

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.repartition(6)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:26

scala> rdd2.partitions.length
res0: Int = 6

scala> val rdd3 = rdd2.coalesce(2,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at coalesce at <console>:28

scala> rdd3.partitions.length
res1: Int = 2
           

     5、collectAsMap:将RDD轉換成Map(注意RDD的資料應為對偶元組)

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2),("c", 2),("d", 4),("e", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> rdd1.collectAsMap
res3: scala.collection.Map[String,Int] = Map(e -> 1, b -> 2, d -> 4, a -> 1, c -> 2)
           

     6、combineByKey:和reduceByKey的效果相同,reduceByKey底層就是調用combineByKey

//首先聲明兩個rdd,然後利用zip将兩個rdd合并成一個,rdd6
scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> val rdd6 = rdd5.zip(rdd4)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:28

scala> rdd6.collect
res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

//我們需要将按照key進行分組合并,相同的key的value都放在List中
//這裡我們第一個參數List(_):表示将第一個value取出放進集合中
//第二個參數(x:List[String],y:String)=>x :+ y:表示局部計算,将value加入到List中
//第三個參數(m:List[String],n:List[String])=>m++n:表示對局部的計算結果再進行計算

scala> val rdd7 = rdd6.combineByKey(List(_),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n)
rdd7: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:30

scala> rdd7.collect
res7: Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(wolf, bear, bee, salmon, rabbit, gnu)))

//這裡第一個參數,可以有另外的寫法。如下面的兩個
scala> val rdd7 = rdd6.combineByKey(_::List(),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))

scala> val rdd7 = rdd6.combineByKey(_::Nil,(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
           

     7、countByKey、countByValue:按照key或者value計算出現的次數

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> rdd1.countByKey
res8: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)           

scala> rdd1.countByValue
res9: scala.collection.Map[(String, Int),Long] = Map((c,2) -> 1, (a,1) -> 1, (b,2) -> 2, (c,1) -> 1)
           

     8、filterByRange

scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b",6)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
//注意:這裡傳入的參數,是左閉右閉的區間
scala> val rdd2 = rdd1.filterByRange("b","d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at filterByRange at <console>:26

scala> rdd2.collect
res10: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,6))
           

     9、flatMapValues:對values進行處理,類似flatMap,會将key和每一個分出來的value組成映射

scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:24

scala> val rdd4 = rdd3.flatMapValues(_.split(" "))
rdd4: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[36] at flatMapValues at <console>:26

scala> rdd4.collect
res11: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
          mapValues:不改變key,隻針對傳入的鍵值對的value進行計算,類似于map;注意與上面的flatMapValues的差別,它不會改變傳入的key-value對,隻是将value按照傳入的函數進行處理;
scala> val rdd3 = sc.parallelize(List(("a",(1,2)),("b",(2,4))))
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ParallelCollectionRDD[57] at parallelize at <console>:24

scala> rdd3.mapValues(x=>x._1 + x._2).collect
res34: Array[(String, Int)] = Array((a,3), (b,6))
------------------------------------------------------------------------
如果使用flatMapValues,結果如下,它将value全部拆開跟key組成映射
scala> rdd3.flatMapValues(x=>x + "").collect
res36: Array[(String, Char)] = Array((a,(), (a,1), (a,,), (a,2), (a,)), (b,(), (b,2), (b,,), (b,4), (b,)))
           

     10、foldByKey:根據key分組,對每一組的value進行計算

scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val rdd2 = rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at map at <console>:26

scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
-----------------------------------------------------------------------------
scala> val rdd3 = rdd2.foldByKey("")(_+_)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[39] at foldByKey at <console>:28

scala> rdd3.collect
res13: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))

scala> val rdd3 = rdd2.foldByKey(" ")(_+_).collect
rdd3: Array[(Int, String)] = Array((4," bear wolf"), (3," dog cat"))
-----------------------------------------------------------------------------
//進行wordcout的計算
val rdd = sc.textFile("hdfs://node-1.itcast.cn:9000/wc").flatMap(_.split(" ")).map((_, 1))
rdd.foldByKey(0)(_+_)
           

     11、keyBy:以傳入的參數作為key,生成新的RDD

scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[41] at parallelize at <console>:24

scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at keyBy at <console>:26

scala> rdd2.collect
res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
           

     12、keys、values:取出rdd的key或者value,生成新的RDD

scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24

scala> rdd1.keys.collect
res16: Array[String] = Array(e, c, d, c, a)

scala> rdd1.values.collect
res17: Array[Int] = Array(5, 3, 4, 2, 1)
           

繼續閱讀