天天看点

Spark常用的RDD

1. Tranformation

val lines=sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)

### #1. map    map(func)  将每个元素传递给函数 func 中,并将返回结果返回为一个新的数据集

scala> val data=Array(1,2,3,4,5)
scala> val rdd1 = sc.parallelize(data)
scala> val rdd2=rdd1.map(x => x+10)
scala> val words=lines.map(line => line.split(""))  //最后得到的是多个数组

### 2. flatmap  flatMap(func) 与map()相似,将map 的输出结果进一步的压平,每个输入元素可以映射到多个输出结果
flatMap 的执行过程: 首先是 map ,得到三个 Array数组,再将每个数组拍平,所有的数据混合到一起。9
scala> val words=lines.flatmap(line => line.split(""))  ```

3. filter   filter(func)将每个元素传递给函数 func 中,并返回一个新的数据集
val linesWithSpark = lines.filter(line => line.contains("spark"))

4. groupByKey  groupByKey()  应用于 (k,v) 键值对的数据集,返回一个新的(k,Iterable)形式的数据集
# rdd = ("hadoop",1),("spark",1),("hadoop",1),("hive",1)
rdd.groupByKey()   //("hadoop",(1,1)) ("spark",1) ("hive",1)

5. reduceByKey reduceByKey()  应用于 (k,v) 键值对的数据集时,返回一个新的 (k,v) 形式的数据集,其中根据Key 对每个值再进行函数 func 进行操作,
# rdd = ("hadoop",1),("spark",1),("hadoop",1),("hive",1)
rdd.reduceByKey((a,b) => a+b)   //("hadoop",(2)) ("spark",1) ("hive",1)

6. groupByKey 与 reduceByKey 有什么区别
   groupByKey 只会进行分组
   reduceByKey 分组并将结果根据聚合函数进行汇总
# rdd = ("hadoop",1),("spark",1),("hadoop",1),("hive",1)
val groupRdd=rdd.groupByKey()   //("hadoop",(1,1)) ("spark",1) ("hive",1)
groupRdd.map(t=>t._1,t._2.sum())  //("hadoop",(2)) ("spark",1) ("hive",1)
rdd.reduceByKey((a,b => a+b))   //("hadoop",(2)) ("spark",1) ("hive",1)

2. Action
// Parallelize 与 makeRDD 是一致的
val rdd=sc.parallelize(Aray(1,2,3,4,5))
val rdd02=sc.makeRDD(Array(1,23,34,5)))

1. count() 统计数据集中元素个数
rdd.count()   //5

2. collect()   以数组的形式返回数据集中所有元素,否则可能散布在不同的机器上
   如果该RDD 为 empty ,则会抛出异常 java.lang.UnsupportedOperationException: empty collection
rdd.collect()  //Array[1,2,3,4,5]

3. first()   返回第一个元素
rdd.first()   //1

4. take(n)  返回前 n个元素
rdd.take(2)   //Array[1,2]

5. reduce(func) 通过函数 func (输入两个参数并返回一个值)  聚合数据集中的元素
rdd.reduce((a,b) => a+b)    //1+2+3+4+5 =15

6. foreach(func)  将数据集中的每个元素传递到函数 func 中运行
rdd.foreach(elem => println(elem))  //1 2 3 4 5

3.键值对RDD/PairRDD
RDD 一般分为数值RDD和键值RDD
所有的RDD都是延迟加载的,只有遇到真正的Action才会进行计算(只有当发生一个要求返回结果给Driver的动作,这些转化才会执行)。
键值RDD 常见的操作有 reduceByKey() | groupByKey() | keys() | values() | combineByKey() | join() | mapValues(func) |sortByKey()
1. keys()  将 PairRDD 中的 key 返回
scala> rdd = (("hadoop",1),("spark",1),("hadoop",1),("hive",1))
scala> rdd.keys().foreach(println)   // hadoop,spark,hadoop,hive 

2. values()  将 PairRDD 中的 value 返回
scala> rdd.values().foreach(println)   // 1,1,1,1 

3. sortByKey()  将 PairRDD 中的 value 按照 Key 进行排序  ,默认是按照升序排序,sortByKey(false) 按照降序排
  sortBy() 可以按照 value 进行排序
scala> rdd =sc.parallelize(Array(("a",1),("c",2),("d",5),("b",95),("d",2)))
scala> rdd.reduceByKey(_+_).sortByKey(flase).collect //Array(("d",7),("b",95),("c",2))
scala> rdd.reduceByKey(_+_).sortBy(_._2,false).collect //Array(("b",95),("d",7),("c",2))

4. mapValues(func)  将 PairRDD 中的 key不变,对 value  进行 函数
scala> rdd = ("hadoop",1),("spark",1),("hadoop",1),("hive",1)
scala> rdd.mapValue(x=>x+1).foreach(println) // ("hadoop",2),("spark",2),("hadoop",2),("hive",2)

5. join() 把几个RDD当中元素 key 相同的进行连接
scala> rdd = ("hadoop",1),("spark",1),("hadoop",2),("hive",1)
scala> rdd2 = ("hadoop","new")
scala> rdd.join(rdd2).foreach(println)  // ("hadoop",(1,new)),("hadoop",(2,new))

4.惰性机制
将 transformation  的轨迹记录下来,只有遇到 action 才会真正的去执行
只有在遇到action时,才会操作,之前都只是记录轨迹。
5.WordCount 实例
scala> val lines=sc.textFile("file:///words.txt")
scala> val wordcount=lines.flatMap(line=> line.split(""))
     | .map(word =>(word,1))         //或者 map(_,1)
     | .reduceByKey((a,b) => a+b)    //或者 reduceByKey(_+_)
scala> wordcount.collect()
scala> wordcount.foreach(println)

6.PairRDD之间的转化操作实例
key  : 图书名称
value : 某天图书的销量
求 每种图书每天的平均销量
scala> val rdd=sc.parallelize(Array(("spark",2),("hadoop",3),("hive",1),("spark",1)))
//("BookName",(sales,每天))   reduceByKey 之后 ("BookName",(TotalSales,TotalDay)) 再进行mapValues()  ("BookName",everyDayAvgSales) 
scala> rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._1 / x._2).collect() 
           

继续阅读