天天看點

spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)

文章目錄

  • 一、環境要求
  • 二、資料描述
  • 三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
    • 1、農産品市場個數統計
    • 2、農産品種類統計
    • 3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将結果輸出到控制台上

資料擷取:連結: https://pan.baidu.com/s/1XcHKF50aEHrB_hPRfTb0vQ 提取碼: 76nd

一、環境要求

  • Hadoop+Hive+Spark+HBase 開發環境。

二、資料描述

1、資料背景

  • 該資料每日進行采集彙總。資料範圍涵蓋全國主要省份(港澳台、西藏、海南暫無資料)的 180+的大型農産品批發市場,380+的農産品品類(由于季節性和地域性等特點,每日的資料中不一定會涵蓋全部的農産品品類)。

2、資料類型

  • 農産品批發市場價格資料products.txt
中文名稱 英文名稱 資料類型
農産品名稱(列 1) name String
批發價格(列 2) price Double
采集時間(列 3) craw_time String
批發市場名稱(列 4) market String
省份(列 5) province String
城市(列 6) city String

三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)

1、農産品市場個數統計

  • 1)統計每個省份的農産品市場總數
    spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
  • 2)統計沒有農産品市場的省份有哪些
    spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)

2、農産品種類統計

  • 1)根據農産品類型數量,統計排名前 3 名的省份
    spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
  • 2)根據農産品類型數量,統計每個省份排名前 3 名的農産品市場
    spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)

3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将結果輸出到控制台上

  • 某種農産品的價格均值計算公式:
  • PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2)
  • 其中,P 表示價格,Mn 表示 market,即農産品市場。PM1 表示 M1 農産品市場的該産品價格,max§表示價格最大值,min§價格最小值。
spark sql練習一、環境要求二、資料描述三、功能要求(要求使用分别使用 RDD 和 Spark SQL 兩種方式實作)
case class Product(productName:String,price:String,craw_time:String,market:String,province:String,city:String)
//轉化rdd
val rdd = sc.textFile("file:///data/products.txt")
//定義schema資訊
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
val schemaString = "productName price craw_time market province city"
val fields = schemaString.split("\\s+").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = rdd.map(_.split("\\s+")).filter(_.size==6).map(x => Row(x(0), x(1),x(2),x(3), x(4),x(5)))
val pdf = spark.createDataFrame(rowRDD, schema)
pdf.printSchema()
pdf.createOrReplaceTempView("products")
//1、農産品市場個數統計
//1)統計每個省份的農産品市場總數
//sql
spark.sql("select province,count(distinct market) from products group by province").show
//rdd
rdd.map(_.split("\\s+")).filter(_.size==6).map(x=>(x(4),x(3))).groupByKey.map(x=>(x._1,x._2.toArray.distinct.size)).collect
//2)統計沒有農産品市場的省份有哪些
case class Province(province:String,nickName:String)
val pdf2 = sc.textFile("file:///data/allprovinces.txt").map(_.split("\\s+")).map(x=>Province(x(0),x(1))).toDF
pdf2.createOrReplaceTempView("province")
spark.sql("with t1 as (select p1.province,p1.nickName,p2.market from province p1 left join products p2 on p1.province=p2.province) select * from t1 where t1.market is null ").show()
//2、農産品種類統計
//1)根據農産品類型數量,統計排名前 3 名的省份
spark.sql("select province,count(distinct productName) c from products group by province order by c desc").show(3)
//2)根據農産品類型數量,統計每個省份排名前 3 名的農産品市場
spark.sql("with t1 as(select province,market,count(distinct productName) c from products group by province,market) ,t2 as(select t1.*,row_number() over(partition by province order by t1.c desc) as rank from t1) select t2.* from t2 where t2.rank<=3").show(10000)
/*
3、價格區間統計,計算山西省每種農産品的價格波動趨勢,即計算每天價格均值,并将
結果輸出到控制台上。
某種農産品的價格均值計算公式:
PAVG = (PM1+PM2+...+PMn-max(P)-min(P))/(N-2)
其中,P 表示價格,Mn 表示 market,即農産品市場。PM1 表示 M1 農産品市場的該
産品價格,max(P)表示價格最大值,min(P)價格最小值。
*/
spark.sql("select productName,if(count(1)>2,round((sum(price)-max(price)-min(price))/(count(1)-2)),round(sum(price)/count(1))) as avgPriceDay from products where province = '山西' group by productName").show(10000)
           

繼續閱讀