天天看点

Spark RDD编程基础

一、数据读写

(1)从文件系统加载数据创建RDD

  ①本地文件:sc.textFile("file:///data/spark/buyer_favorite")

  ②HDFS文件:sc.textFile("hdfs://localhost:9000/spark/buyer_favorite")

(2)通过并行集合创建RDD

  val array = Array(1,2,3,4,5)

  val rdd = sc.parallelize(array)

(3)将RDD写入到文本文件

   使用saveAsTextFile()函数,要求提供一个不存在的目录名称

  *使用repartition重新设置分区个数

二、RDD常见操作

(1)转换操作

①  filter(func):筛选出满足函数func的元素,并返回一个新的数据集。

②  map(func):将每一个元素传递到函数func中,并将结果返回为一个新的数据集。

③  flatMap(func):类似map(),但是每个输入都可以映射0到多个输出结果。可以理解为:

    第一步:执行map()

    第二步:把map操作得到的数组中每个元素“拍扁”(flat)

④  groupByKey():对具有相同键的值进行分组,返回一个(key,Iterable)形式数据集。

⑤  reduceByKey(func):先执行groupByKey()操作得到(key,value-list),根据func对value-list进行操作。

(2)行动操作

①  count():返回数据集中元素个数。

②  collect():以数组的形式返回数据集中的所有元素。

③  first():返回数据集第一个元素。

④  take(n):以数组的形式返回数据集中的前n个元素。

⑤  reduce(func):通过函数func(输入两个参数,返回一个值)聚合数据集中的元素。

⑥  foreach(func):将数据集中每个元素传递到func中运行。

三、键值对RDD

(1)创建键值对RDD

①val lines = sc.textTextFile(file:///usr/local/word.txt)

  var pairRDD=lines.flatMap(_.split("\t")).map(word=>(word,1))

②var pairRDD=rdd.map(word=>(word,1))

(2)常用键值对RDD操作

reduceByKey(func)

groupByKey()

keys:返回一个新的RDD.

values

sortByKey()

sortBy()

mapValues(func)

join():内连接,对于给定的两个数据集(k,v1)和(k,v2),当两个数据集都存在key才会被输出,最注重得到一个(k,(v1,v2))类型的数据集。

四、几个简单例子

(1)

WordCount统计:某电商网站记录了大量的用户对商品的收藏数据,并将数据存储在名为buyer_favorite的文本文件中。文本数据格式如下:

用户id(buyer_id),商品id(goods_id),收藏日期(dt);现要求统计用户收藏数据中,每个用户收藏商品数量。

rdd.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect

(2)

去重:使用spark-shell,对上述实验中,用户收藏数据文件进行统计。根据商品ID进行去重,统计用户收藏数据中都有哪些商品被收藏。

rdd.map(line => line.split('\t')(1)).distinct.collect      相当于

rdd.map(line=>(line.split('\t')),1).reduceByKey(_+_).map(_._1).collect

(3)排序:电商网站都会对商品的访问情况进行统计,现有一个goods_visit文件,存储了电商网站中的各种商品以及此各个商品的点击次数。

商品id(goods_id) 点击次数(click_num)现根据商品的点击次数进行排序,并输出所有商品。

rdd1.map(line => ( line.split('\t')(1).toInt, line.split('\t')(0) ) ).sortByKey(true).map(_._2).collect

(4)Join:现有某电商在2011年12月15日的部分交易数据。数据有订单表orders和订单明细表order_items,表结构及数据分别为:

orders表:(订单id order_id, 订单号 order_number, 买家ID buyer_id, 下单日期 create_dt)

order_items表:(明细ID item_id, 订单ID order_id, 商品ID goods_id )

orders表和order_items表,通过订单id进行关联,是一对多的关系。

下面开启spark-shell,查询在当天该电商网站,都有哪些用户购买了什么商品

rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) ) 

    join

rdd2.map(line=> (line.split('\t')(1), line.split('\t')(2)) )

     .collect

(5)求平均值:电商网站都会对商品的访问情况进行统计。现有一个goods_visit文件,存储了全部商品及各商品的点击次数。还有一个文件goods,记录了商品的基本信息。两张表的数据结构如下:

goods表:商品ID(goods_id),商品状态(goods_status),商品分类id(cat_id),评分(goods_score)

goods_visit表:商品ID(goods_id),商品点击次数(click_num)

商品表(goods)及商品访问情况表(goods_visit)可以根据商品id进行关联。现在统计每个分类下,商品的平均点击次数是多少?

val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )  

val rdd22 = rdd2.map(line=> (line.split('\t')(0), line.split('\t')(1)) )  

val rddjoin = rdd11 join rdd22  

rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>  

{(x._1, x._2._1*1.0/x._2._2)}).collect

继续阅读