天天看點

spark-shell

現要求統計使用者收藏資料中,每個使用者收藏商品數量。

1.在Linux上,建立/data/spark3/wordcount目錄,用于存儲實驗所需的資料。

view plain copy

mkdir -p /data/spark3/wordcount

切換目錄到/data/spark3/wordcount下,并從http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite下載下傳實驗資料。

view plain copy

cd /data/spark3/wordcount

wget http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite

2.使用jps檢視Hadoop以及Spark的相關程序是否已經啟動,若未啟動則執行啟動指令。

view plain copy

jps

/apps/hadoop/sbin/start-all.sh

view plain copy

/apps/spark/sbin/start-all.sh

将Linux本地/data/spark3/wordcount/buyer_favorite檔案,上傳到HDFS上的/myspark3/wordcount目錄下。若HDFS上/myspark3目錄不存在則需提前建立。

view plain copy

hadoop fs -mkdir -p /myspark3/wordcount

hadoop fs -put /data/spark3/wordcount/buyer_favorite /myspark3/wordcount

3.啟動spark-shell

view plain copy

spark-shell

4.編寫Scala語句,統計使用者收藏資料中,每個使用者收藏商品數量。

先在spark-shell中,加載資料。

view plain copy

val rdd = sc.textFile(“hdfs://localhost:9000/myspark3/wordcount/buyer_favorite”);

執行統計并輸出。

view plain copy

rdd.map(line=> (line.split(’\t’)(0),1)).reduceByKey(+).collect

去重:使用spark-shell,對上述實驗中,使用者收藏資料檔案進行統計。根據商品ID進行去重,統計使用者收藏資料中都有哪些商品被收藏。

1.在Linux上,建立/data/spark3/distinct,用于存儲實驗資料。

view plain copy

mkdir -p /data/spark3/distinct

切換到/data/spark3/distinct目錄下,并從http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite下載下傳實驗資料。

view plain copy

cd /data/spark3/distinct

wget http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite

2.使用jps檢視Hadoop,Spark的程序。保證Hadoop、Spark架構相關程序為已啟動狀态。

3.将/data/spark3/distinct/buyer_favorite檔案,上傳到HDFS上的/myspark3/distinct目錄下。若HDFS目錄不存在則建立。

view plain copy

hadoop fs -mkdir -p /myspark3/distinct

hadoop fs -put /data/spark3/distinct/buyer_favorite /myspark3/distinct

4.在Spark視窗,編寫Scala語句,統計使用者收藏資料中,都有哪些商品被收藏。

先加載資料,建立RDD。

view plain copy

val rdd = sc.textFile(“hdfs://localhost:9000/myspark3/distinct/buyer_favorite”);

對RDD進行統計并将結果列印輸出。

view plain copy

rdd.map(line => line.split(’\t’)(1)).distinct.collect

排序:電商網站都會對商品的通路情況進行統計,現有一個goods_visit檔案,存儲了電商網站中的各種商品以及此各個商品的點選次數。

商品id(goods_id) 點選次數(click_num)

view plain copy

商品ID 點選次數

1010037 100

1010102 100

1010152 97

1010178 96

1010280 104

1010320 103

1010510 104

1010603 96

1010637 97

現根據商品的點選次數進行排序,并輸出所有商品。

輸出結果樣式:

view plain copy

點選次數 商品ID

96 1010603

96 1010178

97 1010637

97 1010152

100 1010102

100 1010037

103 1010320

104 1010510

104 1010280

1.在Linux上,建立/data/spark3/sort,用于存儲實驗資料。

view plain copy

mkdir -p /data/spark3/sort

切換到/data/spark3/sort目錄下,并從http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit下載下傳實驗資料。

view plain copy

cd /data/spark3/sort

wget http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit

2.将/data/spark3/sort/goods_visit檔案,上傳到HDFS上的/spark3/sort/目錄下。若HDFS目錄不存在則需提前建立。

view plain copy

hadoop fs -mkdir -p /myspark3/sort

hadoop fs -put /data/spark3/sort/goods_visit /myspark3/sort

3.在Spark視窗,加載資料,将資料轉變為RDD。

view plain copy

val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/sort/goods_visit”);

對RDD進行統計并将結果列印輸出。

view plain copy

rdd1.map(line => ( line.split(’\t’)(1).toInt, line.split(’\t’)(0) ) ).sortByKey(true).collect

4.輸出結果樣式為:

Join:現有某電商在2011年12月15日的部分交易資料。資料有訂單表orders和訂單明細表order_items,表結構及資料分别為:

orders表:(訂單id order_id, 訂單号 order_number, 買家ID buyer_id, 下單日期 create_dt)

view plain copy

訂單ID 訂單号 使用者ID 下單日期

52304 111215052630 176474 2011-12-15 04:58:21

52303 111215052629 178350 2011-12-15 04:45:31

52302 111215052628 172296 2011-12-15 03:12:23

52301 111215052627 178348 2011-12-15 02:37:32

52300 111215052626 174893 2011-12-15 02:18:56

52299 111215052625 169471 2011-12-15 01:33:46

52298 111215052624 178345 2011-12-15 01:04:41

52297 111215052623 176369 2011-12-15 01:02:20

52296 111215052622 178343 2011-12-15 00:38:02

52295 111215052621 178342 2011-12-15 00:18:43

52294 111215052620 178341 2011-12-15 00:14:37

52293 111215052619 178338 2011-12-15 00:13:07

order_items表:(明細ID item_id, 訂單ID order_id, 商品ID goods_id )

view plain copy

明細ID 訂單ID 商品ID

252578 52293 1016840

252579 52293 1014040

252580 52294 1014200

252581 52294 1001012

252582 52294 1022245

252583 52294 1014724

252584 52294 1010731

252586 52295 1023399

252587 52295 1016840

252592 52296 1021134

252593 52296 1021133

252585 52295 1021840

252588 52295 1014040

252589 52296 1014040

252590 52296 1019043

orders表和order_items表,通過訂單id進行關聯,是一對多的關系。

下面開啟spark-shell,查詢在當天該電商網站,都有哪些使用者購買了什麼商品。

1.在Linux上,建立/data/spark3/join,用于存儲實驗資料。

view plain copy

mkdir -p /data/spark3/join

切換目錄到/data/spark3/join目錄下,并從http://192.168.1.100:60000/allfiles/spark3/join/order_items及http://192.168.1.100:60000/allfiles/spark3/join/orders下載下傳實驗資料。

view plain copy

cd /data/spark3/join

wget http://192.168.1.100:60000/allfiles/spark3/join/order_items

wget http://192.168.1.100:60000/allfiles/spark3/join/orders

2.在HDFS上建立/myspark3/join目錄,并将Linux上/data/spark3/join目錄下的資料,上傳到HDFS。

view plain copy

hadoop fs -mkdir -p /myspark3/join

hadoop fs -put /data/spark3/join/orders /myspark3/join

hadoop fs -put /data/spark3/join/order_items /myspark3/join

3.在Spark視窗建立兩個RDD,分别加載orders檔案以及order_items檔案中的資料。

view plain copy

val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/join/orders”);

val rdd2 = sc.textFile(“hdfs://localhost:9000/myspark3/join/order_items”);

4.我們的目的是查詢每個使用者購買了什麼商品。是以對rdd1和rdd2進行map映射,得出關鍵的兩個列的資料。

view plain copy

val rdd11 = rdd1.map(line=> (line.split(’\t’)(0), line.split(’\t’)(2)) )

val rdd22 = rdd2.map(line=> (line.split(’\t’)(1), line.split(’\t’)(2)) )

5.将rdd11以及rdd22中的資料,根據Key值,進行Join關聯,得到最終結果。

view plain copy

val rddresult = rdd11 join rdd22

6.最後将結果輸出,檢視輸出效果。

view plain copy

rddresult.collect

最終的執行結果為:

7.将輸出資料進行格式化:

view plain copy

(52294,(178341,1014200)),

(52294,(178341,1001012)),

(52294,(178341,1022245)),

(52294,(178341,1014724)),

(52294,(178341,1010731)),

(52296,(178343,1021134)),

(52296,(178343,1021133)),

(52296,(178343,1014040)),

(52296,(178343,1019043)),

(52295,(178342,1023399)),

(52295,(178342,1016840)),

(52295,(178342,1021840)),

(52295,(178342,1014040)),

(52293,(178338,1016840)),

(52293,(178338,1014040))

可以看到上面資料關聯後一共有3列,分别為訂單ID,使用者ID,商品ID。

求平均值:電商網站都會對商品的通路情況進行統計。現有一個goods_visit檔案,存儲了全部商品及各商品的點選次數。還有一個檔案goods,記錄了商品的基本資訊。兩張表的資料結構如下:

goods表:商品ID(goods_id),商品狀态(goods_status),商品分類id(cat_id),評分(goods_score)

goods_visit表:商品ID(goods_id),商品點選次數(click_num)

商品表(goods)及商品通路情況表(goods_visit)可以根據商品id進行關聯。現在統計每個分類下,商品的平均點選次數是多少?

1.在Linux上,建立目錄/data/spark3/avg,用于存儲實驗資料。

view plain copy

mkdir -p /data/spark3/avg

切換到/data/spark3/avg目錄下,并從http://192.168.1.100:60000/allfiles/spark3/avg/goods以及http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit兩個網址下載下傳實驗資料。

view plain copy

cd /data/spark3/avg

wget http://192.168.1.100:60000/allfiles/spark3/avg/goods

wget http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit

2.在HDFS上建立目錄/myspark3/avg,并将Linux/data/spark3/avg目錄下的資料,上傳到HDFS的/myspark3/avg。

view plain copy

hadoop fs -mkdir -p /myspark3/avg

hadoop fs -put /data/spark3/avg/goods /myspark3/avg

hadoop fs -put /data/spark3/avg/goods_visit /myspark3/avg

3.在Spark視窗建立兩個RDD,分别加載goods檔案以及goods_visit檔案中的資料。

view plain copy

val rdd1 = sc.textFile(“hdfs://localhost:9000/myspark3/avg/goods”)

val rdd2 = sc.textFile(“hdfs://localhost:9000/myspark3/avg/goods_visit”)

4.我們的目的是統計每個分類下,商品的平均點選次數,我們可以分三步來做。

首先,對rdd1和rdd2進行map映射,得出關鍵的兩個列的資料。

view plain copy

val rdd11 = rdd1.map(line=> (line.split(’\t’)(0), line.split(’\t’)(2)) )

val rdd22 = rdd2.map(line=> (line.split(’\t’)(0), line.split(’\t’)(1)) )

用collect()方法啟動程式。

view plain copy

rdd11.collect

檢視rdd11的結果如下:

view plain copy

rdd11.collect

res2: Array[(String, String)] = Array((1000002,52137), (1000003,52137), (1000004,52137), (1000006,52137),

(1000007,52137), (1000008,52137), (1000010,52137), (1000011,52137), (1000015,52137), (1000018,52137),

(1000020,52137), (1000021,52137), (1000025,52137), (1000028,52137), (1000030,52137), (1000033,52137),

(1000035,52137), (1000037,52137), (1000041,52137), (1000044,52137), (1000048,52137), (1000050,52137),

(1000053,52137), (1000057,52137), (1000059,52137), (1000063,52137), (1000065,52137), (1000067,52137),

(1000071,52137), (1000073,52137), (1000076,52137), (1000078,52137), (1000080,52137), (1000082,52137),

(1000084,52137), (1000086,52137), (1000087,52137), (1000088,52137), (1000090,52137), (1000091,52137),

(1000094,52137), (1000098,52137), (1000101,52137), (1000103,52137), (1000106,52…

scala>>

用collect()方法啟動程式。

view plain copy

rdd22.collect

檢視rdd22的結果如下:

view plain copy

rdd22.collect

res3: Array[(String, String)] = Array((1010000,4), (1010001,0), (1010002,0), (1010003,0), (1010004,0),

(1010005,0), (1010006,74), (1010007,0), (1010008,0), (1010009,1081), (1010010,0), (1010011,0), (1010012,0),

(1010013,44), (1010014,1), (1010018,0), (1010019,542), (1010020,1395), (1010021,18), (1010022,13), (1010023,27),

(1010024,22), (1010025,295), (1010026,13), (1010027,1), (1010028,410), (1010029,2), (1010030,8), (1010031,6),

(1010032,729), (1010033,72), (1010034,3), (1010035,328), (1010036,153), (1010037,100), (1010038,4), (1010039,3),

(1010040,69), (1010041,1), (1010042,1), (1010043,21), (1010044,268), (1010045,11), (1010046,1), (1010047,1),

(1010048,59), (1010049,15), (1010050,19), (1010051,424), (1010052,462), (1010053,9), (1010054,41), (1010055,64),

(1010056,10), (1010057,3), (…

scala>

然後,将rdd11以及rdd22中的資料根據商品ID,也就是key值進行關聯,得到一張大表。表結構變為:(商品id,(商品分類,商品點選次數))

view plain copy

val rddjoin = rdd11 join rdd22

用collect()方法啟動程式。

view plain copy

rddjoin.collect

檢視rddjoin的結果如下:

view plain copy

rddjoin.collect

res4: Array[(String, (String, String))] = Array((1013900,(52137,0)), (1010068,(52007,1316)), (1018970,(52006,788)),

(1020975,(52091,68)), (1019960,(52111,0)), (1019667,(52045,16)), (1010800,(52137,6)), (1019229,(52137,20)), (1022649,

(52119,90)), (1020382,(52137,0)), (1022667,(52021,150)), (1017258,(52086,0)), (1021963,(52072,83)), (1015809,(52137,285)),

(1024340,(52084,0)), (1011043,(52132,0)), (1011762,(52137,2)), (1010976,(52132,34)), (1010512,(52090,8)), (1023965,(52095,0)),

(1017285,(52069,41)), (1020212,(52026,46)), (1010743,(52137,0)), (1020524,(52064,52)), (1022577,(52090,13)), (1021974,(52069,22)),

(1010543,(52137,0)), (1010598,(52136,53)), (1017212,(52108,45)), (1010035,(52006,328)), (1010947,(52089,8)), (1020964,(52071,86)),

(1024001,(52063,0)), (1020191,(52046,0)), (1015739,(…

scala>

最後,在大表的基礎上,進行統計。得到每個分類,商品的平均點選次數。

view plain copy

rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>{(x._1, x._2._1*1.0/x._2._2)}).collect

将結果輸出,檢視輸出效果。

view plain copy

scala> rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>

{(x._1, x._2._1*1.0/x._2._2)}).collect

res40: Array[(String, Double)] = Array((52009,463.3642857142857), (52135,36.69230769230769), (52128,9.0), (52072,42.8),

(52078,16.5), (52137,34.735241502683365), (52047,20.96551724137931), (52050,0.0), (52056,24.57894736842105),

(52087,17.008928571428573), (52085,31.17142857142857), (52007,547.3076923076923), (52052,19.6), (52081,50.833333333333336),

(52016,106.75), (52058,34.23170731707317), (52124,0.0), (52092,28.453703703703702), (52065,8.644444444444444), (52106,22.5),

(52120,96.7843137254902), (52027,114.7), (52089,17.81159420289855), (52098,57.793103448275865), (52038,74.2), (52061,52.609375),

(52104,49.0), (52014,45.4), (52012,53.26), (52100,22.0), (52043,23.0), (52030,532.48), (52023,150.0), (52083,57.857142857142854),

(52041,40.0), (52049,18.058823529411764), (52074,33.17647058…

scala>

繼續閱讀