文章目錄
- 一、環境要求
- 二、資料描述
- 三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
-
- 1、農産品市場個數統計
- 2、農産品種類統計
- 3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将結果輸出到控制台上
資料擷取:連結: https://pan.baidu.com/s/1XcHKF50aEHrB_hPRfTb0vQ 提取碼: 76nd
一、環境要求
- Hadoop+Hive+Spark+HBase 開發環境。
二、資料描述
1、資料背景
- 該資料每日進行采集彙總。資料範圍涵蓋全國主要省份(港澳台、西藏、海南暫無資料)的 180+的大型農産品批發市場,380+的農産品品類(由于季節性和地域性等特點,每日的資料中不一定會涵蓋全部的農産品品類)。
2、資料類型
- 農産品批發市場價格資料products.txt
中文名稱 | 英文名稱 | 資料類型 |
---|---|---|
農産品名稱(列 1) | name | String |
批發價格(列 2) | price | Double |
采集時間(列 3) | craw_time | String |
批發市場名稱(列 4) | market | String |
省份(列 5) | province | String |
城市(列 6) | city | String |
三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
1、農産品市場個數統計
- 1)統計每個省份的農産品市場總數
- 2)統計沒有農産品市場的省份有哪些
2、農産品種類統計
- 1)根據農産品類型數量,統計排名前 3 名的省份
- 2)根據農産品類型數量,統計每個省份排名前 3 名的農産品市場
3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将結果輸出到控制台上
- 某種農産品的價格均值計算公式:
- PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2)
- 其中,P 表示價格,Mn 表示 market,即農産品市場。PM1 表示 M1 農産品市場的該産品價格,max§表示價格最大值,min§價格最小值。
case class Product(productName:String,price:String,craw_time:String,market:String,province:String,city:String)
//轉化rdd
val rdd = sc.textFile("file:///data/products.txt")
//定義schema資訊
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
val schemaString = "productName price craw_time market province city"
val fields = schemaString.split("\\s+").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = rdd.map(_.split("\\s+")).filter(_.size==6).map(x => Row(x(0), x(1),x(2),x(3), x(4),x(5)))
val pdf = spark.createDataFrame(rowRDD, schema)
pdf.printSchema()
pdf.createOrReplaceTempView("products")
//1、農産品市場個數統計
//1)統計每個省份的農産品市場總數
//sql
spark.sql("select province,count(distinct market) from products group by province").show
//rdd
rdd.map(_.split("\\s+")).filter(_.size==6).map(x=>(x(4),x(3))).groupByKey.map(x=>(x._1,x._2.toArray.distinct.size)).collect
//2)統計沒有農産品市場的省份有哪些
case class Province(province:String,nickName:String)
val pdf2 = sc.textFile("file:///data/allprovinces.txt").map(_.split("\\s+")).map(x=>Province(x(0),x(1))).toDF
pdf2.createOrReplaceTempView("province")
spark.sql("with t1 as (select p1.province,p1.nickName,p2.market from province p1 left join products p2 on p1.province=p2.province) select * from t1 where t1.market is null ").show()
//2、農産品種類統計
//1)根據農産品類型數量,統計排名前 3 名的省份
spark.sql("select province,count(distinct productName) c from products group by province order by c desc").show(3)
//2)根據農産品類型數量,統計每個省份排名前 3 名的農産品市場
spark.sql("with t1 as(select province,market,count(distinct productName) c from products group by province,market) ,t2 as(select t1.*,row_number() over(partition by province order by t1.c desc) as rank from t1) select t2.* from t2 where t2.rank<=3").show(10000)
/*
3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将
結果輸出到控制台上。
某種農産品的價格均值計算公式:
PAVG = (PM1+PM2+...+PMn-max(P)-min(P))/(N-2)
其中,P 表示價格,Mn 表示 market,即農産品市場。PM1 表示 M1 農産品市場的該
産品價格,max(P)表示價格最大值,min(P)價格最小值。
*/
spark.sql("select productName,if(count(1)>2,round((sum(price)-max(price)-min(price))/(count(1)-2)),round(sum(price)/count(1))) as avgPriceDay from products where province = '山西' group by productName").show(10000)