天天看点

Spark-shell例子

//parallelize演示(并行化scala的数据集)
val num=sc.parallelize(1 to 10) //将数组并行化成RDD,默认分片
val doublenum=num.map(_*2)  //每个元素*2
val threenum=doublenum.filter(_%3==0)   //过滤出能整除3的元素
//Action触发job的运行
threenum.collect
threenum.toDebugString

//并行化时,并进行分片
val num1=sc.parallelize(1 to 10,6)
val doublenum1=num1.map(_*2)
val threenum1=doublenum1.filter(_ % 3 == 0)
threenum1.collect
threenum1.toDebugString

threenum.cache()
val fournum=threenum.map(x=>x*x)
fournum.collect
fournum.toDebugString
threenum.unpersist() //删除掉cache

num.reduce(_+_) //元素累教
num.take(5) //选择前5个
num.first  //第一个元素
num.count   //计数
num.take(5).foreach(println)//前5个数打印出来           
//K-V演示
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
//按key进行排序
kv1.sortByKey().collect //注意sortByKey的小括号不能省
//按key分组,不做合并
kv1.groupByKey().collect
==>res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2, 5)), 
(A,CompactBuffer(1, 4)), (C,CompactBuffer(3)))

//按key分组,会做合并
kv1.reduceByKey(_+_).collect
==>res15: Array[(String, Int)] = Array((B,7), (A,5), (C,3))

val kv2=sc.parallelize(List(("A",1),("A",4),("C",3),("A",4),("B",5)))
//去重
kv2.distinct.collect
==>res17: Array[(String, Int)] = Array((A,4), (A,1), (B,5), (C,3))
//合并
kv1.union(kv2).collect
==>res18: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,1), (A,4), (C,3), (A,4), (B,5))



val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
==>res21: Array[(String, (Int, Int))] = Array((B,(2,20)), (B,(5,20)), (A,(1,10)), (A,(4,10)))


kv1.cogroup(kv3).collect
==>res22: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2, 5),
CompactBuffer(20))), (D,(CompactBuffer(),CompactBuffer(30))), (A,(CompactBuffer(1, 4),CompactBuffer(10))),
 (C,(CompactBuffer(3),CompactBuffer())))


val kv4=sc.parallelize(List(List(1,2),List(3,4)))
kv4.flatMap(x=>x.map(_+1)).collect
==>res23: Array[Int] = Array(2, 3, 4, 5)           
//文件读取演示
val rdd1=sc.textFile("hdfs://localhost:9000/liguodong/test.txt")
val words=rdd1.flatMap(_.split(" "))
val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)
wordscount.collect
wordscount.toDebugString

val rdd2=sc.textFile("hdfs://localhost:9000/liguodong/*.txt")
rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

//读取压缩文件
val rdd3=sc.textFile("hdfs://localhost:9000/liguodong/oo.txt.gz")
rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
           
//日志处理演示
//下载地址:http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式
//访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
//SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分别是用head -n 或者tail -n 从SogouQ数据日志文件中截取

//搜索结果排名第1,但是点击次序排在第2的数据有多少?
val rdd1 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt.tar.gz")
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
rdd2.count()
val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)
rdd3.count()
rdd3.toDebugString

//session查询次数排行榜 
//排序是true表示升序,flase表示降序
val rdd4=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
rdd4.toDebugString
rdd4.saveAsTextFile("hdfs://localhost:9000/liguodong/output1")

//数据的文件可能有多个,我们可以进行合并输出到本地
hadoop2@ubuntu:/liguodong/software/hadoop$ bin/hdfs dfs -getmerge hdfs://localhost:9000/liguodong/output1 
/home/hadoop2/liguodong/result

hadoop2@ubuntu:/liguodong/software/hadoop$ ll /home/hadoop2/liguodong/result
-rw-r--r-- 1 hadoop2 hadoop 10477740 Dec 22 16:07 /home/hadoop2/liguodong/result

hadoop2@ubuntu:/liguodong/software/hadoop$ head /home/hadoop2/liguodong/result
(b3c94c37fb154d46c30a360c7941ff7e,676)
(cc7063efc64510c20bcdd604e12a3b26,613)
(955c6390c02797b3558ba223b8201915,391)
(b1e371de5729cdda9270b7ad09484c4f,337)
(6056710d9eafa569ddc800fe24643051,277)
(637b29b47fed3853e117aa7009a4b621,266)
(c9f4ff7790d0615f6f66b410673e3124,231)
(dca9034de17f6c34cfd56db13ce39f1c,226)
(82e53ddb484e632437039048c5901608,221)
(c72ce1164bcd263ba1f69292abdfdf7c,214)





//join演示
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float)
case class Click (d: java.util.Date, uuid: String, landing_page: Int)
val reg = sc.textFile("hdfs://localhost:9000/liguodong/join/reg.tsv").map(_.split("\t")).map(r =>
 (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))
val clk = sc.textFile("hdfs://localhost:9000/liguodong/join/clk.tsv").map(_.split("\t")).map(c =>
 (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)))

reg.join(clk).take(2)

==>res34: Array[(String, (Register, Click))] = Array((81da510acc4111e387f3600308919594,
(Register(Tue Mar 04 00:00:00 CST 2014,81da510acc4111e387f3600308919594,2,33.85701,-117.85574),
Click(Thu Mar 06 00:00:00 CST 2014,81da510acc4111e387f3600308919594,61))), 
(15dfb8e6cc4111e3a5bb600308919594,
(Register(Sun Mar 02 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,1,33.659943,-117.95812),
Click(Tue Mar 04 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,11))))


//cache()演示
//检查block命令:bin/hdfs fsck /dataguru/data/SogouQ3.txt -files -blocks -locations
val rdd5 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt")
rdd5.cache()
rdd5.count()

rdd5.count()  //比较时间           

继续阅读