天天看點

SparkSQL項目練習1 準備資料2 需求:各區域熱門商品Top3

文章目錄

  • 1 準備資料
  • 2 需求:各區域熱門商品Top3
    • 2.1 需求簡介
    • 2.2 思路分析
    • 2.3 代碼實作

1 準備資料

本文所需的資料

我們這次Spark-sql操作所有的資料均來自 Hive,首先在Hive中建立表,并導入資料。一共有3張表: 1張使用者行為表,1張城市表,1 張産品表

CREATE TABLE `user_visit_action`(
  `date` string,
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint,
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/user_visit_action.txt' into table sparkpractice.user_visit_action;

CREATE TABLE `product_info`(
  `product_id` bigint,
  `product_name` string,
  `extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/product_info.txt' into table sparkpractice.product_info;

CREATE TABLE `city_info`(
  `city_id` bigint,
  `city_name` string,
  `area` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/city_info.txt' into table sparkpractice.city_info;
           

2 需求:各區域熱門商品Top3

2.1 需求簡介

這裡的熱門商品是從點選量的次元來看的,計算各個區域前三大熱門商品,并備注上每個商品在主要城市中的分布比例,超過兩個城市用其他顯示。

例如:

*地區* *商品名稱* *點選次數* *城市備注*
*華北* 商品A 100000 北京21.2%,天津13.2%,其他65.6%
*華北* 商品P 80200 北京63.0%,太原10%,其他27.0%
*華北* 商品M 40000 北京63.0%,太原10%,其他27.0%
*東北* 商品J 92000 大連28%,遼甯17.0%,其他 55.0%

2.2 思路分析

Ø 使用 sql 來完成,碰到複雜的需求,可以使用 udf 或 udaf

Ø 查詢出來所有的點選記錄,并與 city_info 表連接配接,得到每個城市所在的地區,與 Product_info 表連接配接得到産品名稱

Ø 按照地區和商品名稱分組,統計出每個商品在每個地區的總點選次數

Ø 每個地區内按照點選次數降序排列

Ø 隻取前三名,并把結果儲存在資料庫中

Ø 城市備注需要自定義 UDAF 函數

2.3 代碼實作

  1. udaf 函數定義
class AreaClickUDAF extends UserDefinedAggregateFunction {
  // 輸入資料的類型:  北京  String
  override def inputSchema: StructType = {
    StructType(StructField("city_name", StringType) :: Nil)
    //        StructType(Array(StructField("city_name", StringType)))
  }

  // 緩存的資料的類型: 北京->1000, 天津->5000  Map,  總的點選量  1000/?
  override def bufferSchema: StructType = {
    // MapType(StringType, LongType) 還需要标注 map的key的類型和value的類型
    StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
  }

  // 輸出的資料類型  "北京21.2%,天津13.2%,其他65.6%"  String
  override def dataType: DataType = StringType

  // 相同的輸入是否應用有相同的輸出.
  override def deterministic: Boolean = true

  // 給存儲資料初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //初始化map緩存
    buffer(0) = Map[String, Long]()
    // 初始化總的點選量
    buffer(1) = 0L
  }

  // 分區内合并 Map[城市名, 點選量]
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    // 首先拿到城市名, 然後把城市名作為key去檢視map中是否存在, 如果存在就把對應的值 +1, 如果不存在, 則直接0+1
    val cityName = input.getString(0)
    //        val map: collection.Map[String, Long] = buffer.getMap[String, Long](0)
    val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
    buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
    // 碰到一個城市, 則總的點選量要+1
    buffer(1) = buffer.getLong(1) + 1L
  }

  // 分區間的合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val map1 = buffer1.getAs[Map[String, Long]](0)
    val map2 = buffer2.getAs[Map[String, Long]](0)

    // 把map1的鍵值對與map2中的累積, 最後指派給buffer1
    buffer1(0) = map1.foldLeft(map2) {
      case (map, (k, v)) =>
        map + (k -> (map.getOrElse(k, 0L) + v))
    }

    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  // 最終的輸出. "北京21.2%,天津13.2%,其他65.6%"
  override def evaluate(buffer: Row): Any = {
    val cityCountMap = buffer.getAs[Map[String, Long]](0)
    val totalCount = buffer.getLong(1)

    var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map {
      case (cityName, count) => {
        CityRemark(cityName, count.toDouble / totalCount)
      }
    }
    // 如果城市的個數超過2才顯示其他
    if (cityCountMap.size > 2) {
      citysRatio = citysRatio :+ CityRemark("其他", citysRatio.foldLeft(1D)(_ - _.cityRatio))
    }
    citysRatio.mkString(", ")
  }
}


case class CityRemark(cityName: String, cityRatio: Double) {
  val formatter = new DecimalFormat("0.00%")
  override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
}
           
  1. 具體實作
object SparkSQL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("AreaClickApp")
      .enableHiveSupport()
      .getOrCreate()
    spark.sql("use sparkpractice")
    // 0 注冊自定義聚合函數
    spark.udf.register("city_remark", new AreaClickUDAF)
    // 1. 查詢出所有的點選記錄,并和城市表産品表做内連接配接
    spark.sql(
      """
        |select
        |    c.*,
        |    v.click_product_id,
        |    p.product_name
        |from user_visit_action v join city_info c join product_info p on v.city_id=c.city_id and v.click_product_id=p.product_id
        |where click_product_id>-1
      """.stripMargin).createOrReplaceTempView("t1")

    // 2. 計算每個區域, 每個産品的點選量
    spark.sql(
      """
        |select
        |    t1.area,
        |    t1.product_name,
        |    count(*) click_count,
        |    city_remark(t1.city_name)
        |from t1
        |group by t1.area, t1.product_name
      """.stripMargin).createOrReplaceTempView("t2")

    // 3. 對每個區域内産品的點選量進行倒序排列
    spark.sql(
      """
        |select
        |    *,
        |    rank() over(partition by t2.area order by t2.click_count desc) rank
        |from t2
      """.stripMargin).createOrReplaceTempView("t3")

    // 4. 每個區域取top3

    spark.sql(
      """
        |select
        |    *
        |from t3
        |where rank<=3
      """.stripMargin).show

    //釋放資源
    spark.stop()

  }
}
           

繼續閱讀