天天看点

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

第1章 SparkSQL 概述

Spark SQL 是 Spark 用于处理结构化数据的一个 模块

这里的机构化数据就是值类似数据库的二维数据表

1.2 Hive and SparkSQL

SparkSQL 的前身是 Shark,给熟悉后端开发 但又不理解 MapReduce 的技术人员提供快速上手的工具。

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

1.3 SparkSQL 特点

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

1.4 DataFrame 是什么

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

在数据库中,schema(发音 “skee-muh” 或者“skee-mah”,中文叫模式)是数据库的组织和结构,schemas 和schemata都可以作为复数形式。模式中包含了schema对象,可以是表(table)、列(column)、数据类型(data type)、视图(view)、存储过程(stored procedures)、关系(relationships)、主键(primary key)、外键(foreign key)等。数据库模式可以用一个可视化的图来表示,它显示了数据库对象及其相互之间的关系

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

1.5 DataSet 是什么

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

DataFrame使用示例

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

第2章 SparkSQL 核心编程

重点学习如何使用 Spark SQL 所提供的 DataFrame 和 DataSet 模型进行编程.,以及了解它们之间的关系和转换,掌握具体的 SQL 书写是学习的前期。

2.1 新的起点

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2 DataFrame

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2.1 创建 DataFrame

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2.2 SQL 语法

写sql需要先有表

注意: df.createOrReplaceTempView(“people” 创建的是一个视图,数据库中有view和table的概念,其实都是表,只是view是只读的表不能修改,tbale是可读写的表。

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

Session 范围内是只临时表只在一个连接内有效,在其他连接中无效。

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2.3 DSL 语法

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2.4 RDD 转换为 DataFrame

RDD 转换为 DataFrame, RDD 中数据没有机构,需要告诉DataFrame 数据的结构,就可以转换

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.2.5 DataFrame 转换为 RDD

DataFrame 转换为 RDD,RDD不需要数据结构,可以直接转换。

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.3 DataSet

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.3.1 创建 DataSet

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.3.2 RDD 转换为 DataSet

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.3.3 DataSet 转换为 RDD

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.4 DataFrame 和 DataSet 转换

DataFrame 和 DataSet 的主要区别时DataSet需要数据类型,DataFrame转DataSet指定类型即可

val ds = df.as[User]

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.5 RDD、DataFrame、DataSet 三者的关系

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.5.1 三者的共性

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.5.2 三者的区别

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.5.3 三者的互相转换

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.6 IDEA 开发 SparkSQL

实际开发中,都是使用 IDEA 进行开发的。

2.6.1 添加依赖

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.12</artifactId>
 <version>3.0.0</version>
</dependency>
           

2.6.2 代码实现

object SparkSQL01_Demo {
 def main(args: Array[String]): Unit = {
 //创建上下文环境配置对象
 val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
 //创建 SparkSession 对象
 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
 //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
 //spark 不是包名,是上下文环境对象名
 import spark.implicits._
 //读取 json 文件 创建 DataFrame {"username": "lisi","age": 18}
 val df: DataFrame = spark.read.json("input/test.json")
 //df.show()
 //SQL 风格语法
 df.createOrReplaceTempView("user")
 //spark.sql("select avg(age) from user").show
 //DSL 风格语法
 //df.select("username","age").show()
 //*****RDD=>DataFrame=>DataSet*****
 //RDD
 val rdd1: RDD[(Int, String, Int)] = 
spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",
20)))
 //DataFrame
 val df1: DataFrame = rdd1.toDF("id","name","age")
 //df1.show()
 //DateSet
 val ds1: Dataset[User] = df1.as[User]
 //ds1.show()
 //*****DataSet=>DataFrame=>RDD*****
 //DataFrame
 val df2: DataFrame = ds1.toDF()
 //RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,
但是索引从 0 开始
 val rdd2: RDD[Row] = df2.rdd
 //rdd2.foreach(a=>println(a.getString(1)))
 //*****RDD=>DataSet*****
 rdd1.map{
 case (id,name,age)=>User(id,name,age)
 }.toDS()
 //*****DataSet=>=>RDD*****
 ds1.rdd
 //释放资源
 spark.stop()
 } }
 //样例类
case class User(id:Int,name:String,age:Int)
           

2.7 用户自定义函数

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.7.1 UDF

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark02_SparkSQL_UDF {

    def main(args: Array[String]): Unit = {

        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        import spark.implicits._

        val df = spark.read.json("datas/user.json")
        df.createOrReplaceTempView("user")

        //自定义一个函数,函数名为prefixName,传入参数为name:String
        spark.udf.register("prefixName", (name:String) => {
            "Name: " + name
        })
        //使用自定义的函数
        spark.sql("select age, prefixName(username) from user").show
        
        // TODO 关闭环境
        spark.close()
    }
}

           
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.7.2 UDAF

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

实现方式 - UDAF - 强类型

求年龄的平均值

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}

object Spark03_SparkSQL_UDAF2 {

    def main(args: Array[String]): Unit = {

        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        import spark.implicits._
        val df = spark.read.json("datas/user.json")

        // 早期版本中,spark不能在sql中使用强类型UDAF操作
        // SQL & DSL
        // 早期的UDAF强类型聚合函数使用DSL语法操作
        val ds: Dataset[User] = df.as[User]

        // 将UDAF函数转换为查询的列对象
        val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn

        ds.select(udafCol).show


        // TODO 关闭环境
        spark.close()
    }
    /*
     自定义聚合函数类:计算年龄的平均值
     1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
         IN : 输入的数据类型 User
         BUF : 缓冲区的数据类型 Buff
         OUT : 输出的数据类型 Long
     2. 重写方法(6)
     */
    case class User(username:String, age:Long)
    case class Buff( var total:Long, var count:Long )
    class MyAvgUDAF extends Aggregator[User, Buff, Long]{
        // z & zero : 初始值或零值
        // 缓冲区的初始化
        override def zero: Buff = {
            Buff(0L,0L)
        }

        // 根据输入的数据更新缓冲区的数据
        override def reduce(buff: Buff, in: User): Buff = {
            buff.total = buff.total + in.age
            buff.count = buff.count + 1
            buff
        }

        // 因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
        override def merge(buff1: Buff, buff2: Buff): Buff = {
            buff1.total = buff1.total + buff2.total
            buff1.count = buff1.count + buff2.count
            buff1
        }

        //根据最后的结果,再执行具体的业务计算逻辑
        override def finish(buff: Buff): Long = {
            buff.total / buff.count
        }

        // 缓冲区的编码操作
        override def bufferEncoder: Encoder[Buff] = Encoders.product

        // 输出的编码操作
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }
}


           

2.8 数据的加载和保存

2.8.1 通用的加载和保存方式

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.8.2 Parquet

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.8.3 JSON

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.8.4 CSV

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2.8.5 MySQL

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

1)导入依赖

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>
           

2)读取数据

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式 1:通用的 load 方法读取
spark.read.format("jdbc")
 .option("url", "jdbc:mysql://linux1:3306/spark-sql")
 .option("driver", "com.mysql.jdbc.Driver")
 .option("user", "root")
 .option("password", "123123")
 .option("dbtable", "user")
 .load().show
//方式 2:通用的 load 方法读取 参数另一种形式
spark.read.format("jdbc")
 .options(Map("url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=
123123",
 "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
//方式 3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", 
"user", props)
df.show
//释放资源
spark.stop()
           

3)写入数据

case class User2(name: String, age: Long)
。。。
val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), 
User2("zs", 30)))
val ds: Dataset[User2] = rdd.toDS
//方式 1:通用的方式 format 指定写出类型
ds.write
 .format("jdbc")
 .option("url", "jdbc:mysql://linux1:3306/spark-sql")
 .option("user", "root")
 .option("password", "123123")
 .option("dbtable", "user")
 .mode(SaveMode.Append)
 .save()
//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", 
"user", props)
//释放资源
spark.stop()
           

2.8.6 Hive

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

1)内嵌的 HIVE

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

2)外部的 HIVE

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
5)代码操作 Hive

1)导入依赖

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-hive_2.12</artifactId>
 <version>3.0.0</version>
</dependency>
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-exec</artifactId>
 <version>1.2.1</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>
           

2)将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
//创建 SparkSession
val spark: SparkSession = SparkSession
 .builder()
 .enableHiveSupport()
 .master("local[*]")
 .appName("sql")
 .getOrCreate()
           
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object Spark05_SparkSQL_Hive {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        /启用Hive的支持
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        // 使用SparkSQL连接外置的Hive
        // 1. 拷贝Hive-size.xml文件到classpath下
        // 2. 启用Hive的支持
        // 3. 增加对应的依赖关系(包含MySQL驱动)
        spark.sql("show tables").show

        // TODO 关闭环境
        spark.close()
    }
}
           
尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

第3章 SparkSQL 项目实战

3.1 数据准备

我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。

一共有 3 张表: 1 张用户行为表,1 张城市表,1 张产品表

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object Spark06_SparkSQL_Test {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("use atguigu")




        // 准备数据
        //多行写sql的方式,在hive中创建表
        spark.sql(
            """
              |CREATE TABLE `user_visit_action`(
              |  `date` string,
              |  `user_id` bigint,
              |  `session_id` string,
              |  `page_id` bigint,
              |  `action_time` string,
              |  `search_keyword` string,
              |  `click_category_id` bigint,
              |  `click_product_id` bigint,
              |  `order_category_ids` string,
              |  `order_product_ids` string,
              |  `pay_category_ids` string,
              |  `pay_product_ids` string,
              |  `city_id` bigint)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        //加载本地数据
        spark.sql(
            """
              |load data local inpath 'datas/user_visit_action.txt' into table atguigu.user_visit_action
            """.stripMargin)

        spark.sql(
            """
              |CREATE TABLE `product_info`(
              |  `product_id` bigint,
              |  `product_name` string,
              |  `extend_info` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        spark.sql(
            """
              |load data local inpath 'datas/product_info.txt' into table atguigu.product_info
            """.stripMargin)

        spark.sql(
            """
              |CREATE TABLE `city_info`(
              |  `city_id` bigint,
              |  `city_name` string,
              |  `area` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        spark.sql(
            """
              |load data local inpath 'datas/city_info.txt' into table atguigu.city_info
            """.stripMargin)

        spark.sql("""select * from city_info""").show


        spark.close()
    }
}

           

3.2 需求:各区域热门商品 Top3

3.2.1 需求简介

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

3.2.2 需求分析

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战

3.2.3 功能实现

尚硅谷 SparkSQL 3.0第1章 SparkSQL 概述第3章 SparkSQL 项目实战
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Spark06_SparkSQL_Test2 {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("use atguigu")

        // 查询基本数据
        spark.sql(
            """
              |  select
              |     a.*,
              |     p.product_name,
              |     c.area,
              |     c.city_name
              |  from user_visit_action a
              |  join product_info p on a.click_product_id = p.product_id
              |  join city_info c on a.city_id = c.city_id
              |  where a.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")

        // 根据区域,商品进行数据聚合
        spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
        spark.sql(
            """
              |  select
              |     area,
              |     product_name,
              |     count(*) as clickCnt,
              |     cityRemark(city_name) as city_remark
              |  from t1 group by area, product_name
            """.stripMargin).createOrReplaceTempView("t2")

        // 区域内对点击数量进行排行
        spark.sql(
            """
              |  select
              |      *,
              |      rank() over( partition by area order by clickCnt desc ) as rank
              |  from t2
            """.stripMargin).createOrReplaceTempView("t3")

        // 取前3名
        spark.sql(
            """
              | select
              |     *
              | from t3 where rank <= 3
            """.stripMargin).show(false)

        spark.close()
    }
    case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )
    // 自定义聚合函数:实现城市备注功能
    // 1. 继承Aggregator, 定义泛型
    //    IN : 城市名称
    //    BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
    //    OUT : 备注信息
    // 2. 重写方法(6)
    class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
        // 缓冲区初始化
        override def zero: Buffer = {
            Buffer(0, mutable.Map[String, Long]())
        }

        // 更新缓冲区数据
        override def reduce(buff: Buffer, city: String): Buffer = {
            buff.total += 1
            val newCount = buff.cityMap.getOrElse(city, 0L) + 1
            buff.cityMap.update(city, newCount)
            buff
        }

        //因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
        override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
            buff1.total += buff2.total

            val map1 = buff1.cityMap
            val map2 = buff2.cityMap

            // 两个Map的合并操作方法1
//            buff1.cityMap = map1.foldLeft(map2) {
//                case ( map, (city, cnt) ) => {
//                    val newCount = map.getOrElse(city, 0L) + cnt
//                    map.update(city, newCount)
//                    map
//                }
//            }
            // 两个Map的合并操作方法2
            map2.foreach{
                case (city , cnt) => {
                    val newCount = map1.getOrElse(city, 0L) + cnt
                    map1.update(city, newCount)
                }
            }
            buff1.cityMap = map1
            buff1
        }
        根据最后的结果,再执行具体的业务计算逻辑: 将统计的结果生成字符串信息
        override def finish(buff: Buffer): String = {
            val remarkList = ListBuffer[String]()

            val totalcnt = buff.total
            val cityMap = buff.cityMap

            // 降序排列
            val cityCntList = cityMap.toList.sortWith(
                (left, right) => {
                    left._2 > right._2
                }
            ).take(2)

            val hasMore = cityMap.size > 2
            var rsum = 0L
            cityCntList.foreach{
                case ( city, cnt ) => {
                    val r = cnt * 100 / totalcnt
                    remarkList.append(s"${city} ${r}%")
                    rsum += r
                }
            }
            if ( hasMore ) {
                remarkList.append(s"其他 ${100 - rsum}%")
            }

            remarkList.mkString(", ")
        }

        override def bufferEncoder: Encoder[Buffer] = Encoders.product

        override def outputEncoder: Encoder[String] = Encoders.STRING
    }
}

           

继续阅读