現要求統計使用者收藏資料中,每個使用者收藏商品數量。
1.在Linux上,建立/data/spark3/wordcount目錄,用于存儲實驗所需的資料。
view plain copy
mkdir -p /data/spark3/wordcount
切換目錄到/data/spark3/wordcount下,并從http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite下載下傳實驗資料。
view plain copy
cd /data/spark3/wordcount
wget http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite
2.使用jps檢視Hadoop以及Spark的相關程序是否已經啟動,若未啟動則執行啟動指令。
view plain copy
jps
/apps/hadoop/sbin/start-all.sh
view plain copy
/apps/spark/sbin/start-all.sh
将Linux本地/data/spark3/wordcount/buyer_favorite檔案,上傳到HDFS上的/myspark3/wordcount目錄下。若HDFS上/myspark3目錄不存在則需提前建立。
view plain copy
hadoop fs -mkdir -p /myspark3/wordcount
hadoop fs -put /data/spark3/wordcount/buyer_favorite /myspark3/wordcount
3.啟動spark-shell
view plain copy
spark-shell
4.編寫Scala語句,統計使用者收藏資料中,每個使用者收藏商品數量。
先在spark-shell中,加載資料。
view plain copy
val rdd = sc.textFile(“hdfs://localhost:9000/myspark3/wordcount/buyer_favorite”);
執行統計并輸出。
view plain copy
rdd.map(line=> (line.split(’\t’)(0),1)).reduceByKey(+).collect
去重:使用spark-shell,對上述實驗中,使用者收藏資料檔案進行統計。根據商品ID進行去重,統計使用者收藏資料中都有哪些商品被收藏。
1.在Linux上,建立/data/spark3/distinct,用于存儲實驗資料。
view plain copy
mkdir -p /data/spark3/distinct
切換到/data/spark3/distinct目錄下,并從http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite下載下傳實驗資料。
view plain copy
cd /data/spark3/distinct
wget http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite
2.使用jps檢視Hadoop,Spark的程序。保證Hadoop、Spark架構相關程序為已啟動狀态。
3.将/data/spark3/distinct/buyer_favorite檔案,上傳到HDFS上的/myspark3/distinct目錄下。若HDFS目錄不存在則建立。
view plain copy
hadoop fs -mkdir -p /myspark3/distinct
hadoop fs -put /data/spark3/distinct/buyer_favorite /myspark3/distinct
4.在Spark視窗,編寫Scala語句,統計使用者收藏資料中,都有哪些商品被收藏。
先加載資料,建立RDD。
view plain copy
val rdd = sc.textFile(“hdfs://localhost:9000/myspark3/distinct/buyer_favorite”);
對RDD進行統計并将結果列印輸出。
view plain copy
rdd.map(line => line.split(’\t’)(1)).distinct.collect
排序:電商網站都會對商品的通路情況進行統計,現有一個goods_visit檔案,存儲了電商網站中的各種商品以及此各個商品的點選次數。
商品id(goods_id) 點選次數(click_num)
view plain copy
商品ID 點選次數
1010037 100
1010102 100
1010152 97
1010178 96
1010280 104
1010320 103
1010510 104
1010603 96
1010637 97
現根據商品的點選次數進行排序,并輸出所有商品。
輸出結果樣式:
view plain copy
點選次數 商品ID
96 1010603
96 1010178
97 1010637
97 1010152
100 1010102
100 1010037
103 1010320
104 1010510
104 1010280
1.在Linux上,建立/data/spark3/sort,用于存儲實驗資料。
view plain copy
mkdir -p /data/spark3/sort
切換到/data/spark3/sort目錄下,并從http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit下載下傳實驗資料。
view plain copy
cd /data/spark3/sort
wget http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit
2.将/data/spark3/sort/goods_visit檔案,上傳到HDFS上的/spark3/sort/目錄下。若HDFS目錄不存在則需提前建立。
view plain copy
hadoop fs -mkdir -p /myspark3/sort
hadoop fs -put /data/spark3/sort/goods_visit /myspark3/sort
3.在Spark視窗,加載資料,将資料轉變為RDD。
view plain copy
val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/sort/goods_visit”);
對RDD進行統計并将結果列印輸出。
view plain copy
rdd1.map(line => ( line.split(’\t’)(1).toInt, line.split(’\t’)(0) ) ).sortByKey(true).collect
4.輸出結果樣式為:
Join:現有某電商在2011年12月15日的部分交易資料。資料有訂單表orders和訂單明細表order_items,表結構及資料分别為:
orders表:(訂單id order_id, 訂單号 order_number, 買家ID buyer_id, 下單日期 create_dt)
view plain copy
訂單ID 訂單号 使用者ID 下單日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
52294 111215052620 178341 2011-12-15 00:14:37
52293 111215052619 178338 2011-12-15 00:13:07
order_items表:(明細ID item_id, 訂單ID order_id, 商品ID goods_id )
view plain copy
明細ID 訂單ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043
orders表和order_items表,通過訂單id進行關聯,是一對多的關系。
下面開啟spark-shell,查詢在當天該電商網站,都有哪些使用者購買了什麼商品。
1.在Linux上,建立/data/spark3/join,用于存儲實驗資料。
view plain copy
mkdir -p /data/spark3/join
切換目錄到/data/spark3/join目錄下,并從http://192.168.1.100:60000/allfiles/spark3/join/order_items及http://192.168.1.100:60000/allfiles/spark3/join/orders下載下傳實驗資料。
view plain copy
cd /data/spark3/join
wget http://192.168.1.100:60000/allfiles/spark3/join/order_items
wget http://192.168.1.100:60000/allfiles/spark3/join/orders
2.在HDFS上建立/myspark3/join目錄,并将Linux上/data/spark3/join目錄下的資料,上傳到HDFS。
view plain copy
hadoop fs -mkdir -p /myspark3/join
hadoop fs -put /data/spark3/join/orders /myspark3/join
hadoop fs -put /data/spark3/join/order_items /myspark3/join
3.在Spark視窗建立兩個RDD,分别加載orders檔案以及order_items檔案中的資料。
view plain copy
val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/join/orders”);
val rdd2 = sc.textFile(“hdfs://localhost:9000/myspark3/join/order_items”);
4.我們的目的是查詢每個使用者購買了什麼商品。是以對rdd1和rdd2進行map映射,得出關鍵的兩個列的資料。
view plain copy
val rdd11 = rdd1.map(line=> (line.split(’\t’)(0), line.split(’\t’)(2)) )
val rdd22 = rdd2.map(line=> (line.split(’\t’)(1), line.split(’\t’)(2)) )
5.将rdd11以及rdd22中的資料,根據Key值,進行Join關聯,得到最終結果。
view plain copy
val rddresult = rdd11 join rdd22
6.最後将結果輸出,檢視輸出效果。
view plain copy
rddresult.collect
最終的執行結果為:
7.将輸出資料進行格式化:
view plain copy
(52294,(178341,1014200)),
(52294,(178341,1001012)),
(52294,(178341,1022245)),
(52294,(178341,1014724)),
(52294,(178341,1010731)),
(52296,(178343,1021134)),
(52296,(178343,1021133)),
(52296,(178343,1014040)),
(52296,(178343,1019043)),
(52295,(178342,1023399)),
(52295,(178342,1016840)),
(52295,(178342,1021840)),
(52295,(178342,1014040)),
(52293,(178338,1016840)),
(52293,(178338,1014040))
可以看到上面資料關聯後一共有3列,分别為訂單ID,使用者ID,商品ID。
求平均值:電商網站都會對商品的通路情況進行統計。現有一個goods_visit檔案,存儲了全部商品及各商品的點選次數。還有一個檔案goods,記錄了商品的基本資訊。兩張表的資料結構如下:
goods表:商品ID(goods_id),商品狀态(goods_status),商品分類id(cat_id),評分(goods_score)
goods_visit表:商品ID(goods_id),商品點選次數(click_num)
商品表(goods)及商品通路情況表(goods_visit)可以根據商品id進行關聯。現在統計每個分類下,商品的平均點選次數是多少?
1.在Linux上,建立目錄/data/spark3/avg,用于存儲實驗資料。
view plain copy
mkdir -p /data/spark3/avg
切換到/data/spark3/avg目錄下,并從http://192.168.1.100:60000/allfiles/spark3/avg/goods以及http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit兩個網址下載下傳實驗資料。
view plain copy
cd /data/spark3/avg
wget http://192.168.1.100:60000/allfiles/spark3/avg/goods
wget http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit
2.在HDFS上建立目錄/myspark3/avg,并将Linux/data/spark3/avg目錄下的資料,上傳到HDFS的/myspark3/avg。
view plain copy
hadoop fs -mkdir -p /myspark3/avg
hadoop fs -put /data/spark3/avg/goods /myspark3/avg
hadoop fs -put /data/spark3/avg/goods_visit /myspark3/avg
3.在Spark視窗建立兩個RDD,分别加載goods檔案以及goods_visit檔案中的資料。
view plain copy
val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/avg/goods”)
val rdd2 = sc.textFile(“hdfs://localhost:9000/myspark3/avg/goods_visit”)
4.我們的目的是統計每個分類下,商品的平均點選次數,我們可以分三步來做。
首先,對rdd1和rdd2進行map映射,得出關鍵的兩個列的資料。
view plain copy
val rdd11 = rdd1.map(line=> (line.split(’\t’)(0), line.split(’\t’)(2)) )
val rdd22 = rdd2.map(line=> (line.split(’\t’)(0), line.split(’\t’)(1)) )
用collect()方法啟動程式。
view plain copy
rdd11.collect
檢視rdd11的結果如下:
view plain copy
rdd11.collect
res2: Array[(String, String)] = Array((1000002,52137), (1000003,52137), (1000004,52137), (1000006,52137),
(1000007,52137), (1000008,52137), (1000010,52137), (1000011,52137), (1000015,52137), (1000018,52137),
(1000020,52137), (1000021,52137), (1000025,52137), (1000028,52137), (1000030,52137), (1000033,52137),
(1000035,52137), (1000037,52137), (1000041,52137), (1000044,52137), (1000048,52137), (1000050,52137),
(1000053,52137), (1000057,52137), (1000059,52137), (1000063,52137), (1000065,52137), (1000067,52137),
(1000071,52137), (1000073,52137), (1000076,52137), (1000078,52137), (1000080,52137), (1000082,52137),
(1000084,52137), (1000086,52137), (1000087,52137), (1000088,52137), (1000090,52137), (1000091,52137),
(1000094,52137), (1000098,52137), (1000101,52137), (1000103,52137), (1000106,52…
scala>>
用collect()方法啟動程式。
view plain copy
rdd22.collect
檢視rdd22的結果如下:
view plain copy
rdd22.collect
res3: Array[(String, String)] = Array((1010000,4), (1010001,0), (1010002,0), (1010003,0), (1010004,0),
(1010005,0), (1010006,74), (1010007,0), (1010008,0), (1010009,1081), (1010010,0), (1010011,0), (1010012,0),
(1010013,44), (1010014,1), (1010018,0), (1010019,542), (1010020,1395), (1010021,18), (1010022,13), (1010023,27),
(1010024,22), (1010025,295), (1010026,13), (1010027,1), (1010028,410), (1010029,2), (1010030,8), (1010031,6),
(1010032,729), (1010033,72), (1010034,3), (1010035,328), (1010036,153), (1010037,100), (1010038,4), (1010039,3),
(1010040,69), (1010041,1), (1010042,1), (1010043,21), (1010044,268), (1010045,11), (1010046,1), (1010047,1),
(1010048,59), (1010049,15), (1010050,19), (1010051,424), (1010052,462), (1010053,9), (1010054,41), (1010055,64),
(1010056,10), (1010057,3), (…
scala>
然後,将rdd11以及rdd22中的資料根據商品ID,也就是key值進行關聯,得到一張大表。表結構變為:(商品id,(商品分類,商品點選次數))
view plain copy
val rddjoin = rdd11 join rdd22
用collect()方法啟動程式。
view plain copy
rddjoin.collect
檢視rddjoin的結果如下:
view plain copy
rddjoin.collect
res4: Array[(String, (String, String))] = Array((1013900,(52137,0)), (1010068,(52007,1316)), (1018970,(52006,788)),
(1020975,(52091,68)), (1019960,(52111,0)), (1019667,(52045,16)), (1010800,(52137,6)), (1019229,(52137,20)), (1022649,
(52119,90)), (1020382,(52137,0)), (1022667,(52021,150)), (1017258,(52086,0)), (1021963,(52072,83)), (1015809,(52137,285)),
(1024340,(52084,0)), (1011043,(52132,0)), (1011762,(52137,2)), (1010976,(52132,34)), (1010512,(52090,8)), (1023965,(52095,0)),
(1017285,(52069,41)), (1020212,(52026,46)), (1010743,(52137,0)), (1020524,(52064,52)), (1022577,(52090,13)), (1021974,(52069,22)),
(1010543,(52137,0)), (1010598,(52136,53)), (1017212,(52108,45)), (1010035,(52006,328)), (1010947,(52089,8)), (1020964,(52071,86)),
(1024001,(52063,0)), (1020191,(52046,0)), (1015739,(…
scala>
最後,在大表的基礎上,進行統計。得到每個分類,商品的平均點選次數。
view plain copy
rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>{(x._1, x._2._1*1.0/x._2._2)}).collect
将結果輸出,檢視輸出效果。
view plain copy
scala> rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>
{(x._1, x._2._1*1.0/x._2._2)}).collect
res40: Array[(String, Double)] = Array((52009,463.3642857142857), (52135,36.69230769230769), (52128,9.0), (52072,42.8),
(52078,16.5), (52137,34.735241502683365), (52047,20.96551724137931), (52050,0.0), (52056,24.57894736842105),
(52087,17.008928571428573), (52085,31.17142857142857), (52007,547.3076923076923), (52052,19.6), (52081,50.833333333333336),
(52016,106.75), (52058,34.23170731707317), (52124,0.0), (52092,28.453703703703702), (52065,8.644444444444444), (52106,22.5),
(52120,96.7843137254902), (52027,114.7), (52089,17.81159420289855), (52098,57.793103448275865), (52038,74.2), (52061,52.609375),
(52104,49.0), (52014,45.4), (52012,53.26), (52100,22.0), (52043,23.0), (52030,532.48), (52023,150.0), (52083,57.857142857142854),
(52041,40.0), (52049,18.058823529411764), (52074,33.17647058…
scala>