第1章 SparkSQL 概述
Spark SQL 是 Spark 用于处理结构化数据的一个 模块
这里的机构化数据就是值类似数据库的二维数据表
1.2 Hive and SparkSQL
SparkSQL 的前身是 Shark,给熟悉后端开发 但又不理解 MapReduce 的技术人员提供快速上手的工具。
1.3 SparkSQL 特点
1.4 DataFrame 是什么
在数据库中,schema(发音 “skee-muh” 或者“skee-mah”,中文叫模式)是数据库的组织和结构,schemas 和schemata都可以作为复数形式。模式中包含了schema对象,可以是表(table)、列(column)、数据类型(data type)、视图(view)、存储过程(stored procedures)、关系(relationships)、主键(primary key)、外键(foreign key)等。数据库模式可以用一个可视化的图来表示,它显示了数据库对象及其相互之间的关系
1.5 DataSet 是什么
DataFrame使用示例
第2章 SparkSQL 核心编程
重点学习如何使用 Spark SQL 所提供的 DataFrame 和 DataSet 模型进行编程.,以及了解它们之间的关系和转换,掌握具体的 SQL 书写是学习的前期。
2.1 新的起点
2.2 DataFrame
2.2.1 创建 DataFrame
2.2.2 SQL 语法
写sql需要先有表
注意: df.createOrReplaceTempView(“people” 创建的是一个视图,数据库中有view和table的概念,其实都是表,只是view是只读的表不能修改,tbale是可读写的表。
Session 范围内是只临时表只在一个连接内有效,在其他连接中无效。
2.2.3 DSL 语法
2.2.4 RDD 转换为 DataFrame
RDD 转换为 DataFrame, RDD 中数据没有机构,需要告诉DataFrame 数据的结构,就可以转换
2.2.5 DataFrame 转换为 RDD
DataFrame 转换为 RDD,RDD不需要数据结构,可以直接转换。
2.3 DataSet
2.3.1 创建 DataSet
2.3.2 RDD 转换为 DataSet
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
2.3.3 DataSet 转换为 RDD
2.4 DataFrame 和 DataSet 转换
DataFrame 和 DataSet 的主要区别时DataSet需要数据类型,DataFrame转DataSet指定类型即可
val ds = df.as[User]
2.5 RDD、DataFrame、DataSet 三者的关系
2.5.1 三者的共性
2.5.2 三者的区别
2.5.3 三者的互相转换
2.6 IDEA 开发 SparkSQL
实际开发中,都是使用 IDEA 进行开发的。
2.6.1 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2.6.2 代码实现
object SparkSQL01_Demo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
//spark 不是包名,是上下文环境对象名
import spark.implicits._
//读取 json 文件 创建 DataFrame {"username": "lisi","age": 18}
val df: DataFrame = spark.read.json("input/test.json")
//df.show()
//SQL 风格语法
df.createOrReplaceTempView("user")
//spark.sql("select avg(age) from user").show
//DSL 风格语法
//df.select("username","age").show()
//*****RDD=>DataFrame=>DataSet*****
//RDD
val rdd1: RDD[(Int, String, Int)] =
spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",
20)))
//DataFrame
val df1: DataFrame = rdd1.toDF("id","name","age")
//df1.show()
//DateSet
val ds1: Dataset[User] = df1.as[User]
//ds1.show()
//*****DataSet=>DataFrame=>RDD*****
//DataFrame
val df2: DataFrame = ds1.toDF()
//RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,
但是索引从 0 开始
val rdd2: RDD[Row] = df2.rdd
//rdd2.foreach(a=>println(a.getString(1)))
//*****RDD=>DataSet*****
rdd1.map{
case (id,name,age)=>User(id,name,age)
}.toDS()
//*****DataSet=>=>RDD*****
ds1.rdd
//释放资源
spark.stop()
} }
//样例类
case class User(id:Int,name:String,age:Int)
2.7 用户自定义函数
2.7.1 UDF
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Spark02_SparkSQL_UDF {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
//自定义一个函数,函数名为prefixName,传入参数为name:String
spark.udf.register("prefixName", (name:String) => {
"Name: " + name
})
//使用自定义的函数
spark.sql("select age, prefixName(username) from user").show
// TODO 关闭环境
spark.close()
}
}
2.7.2 UDAF
实现方式 - UDAF - 强类型
求年龄的平均值
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}
object Spark03_SparkSQL_UDAF2 {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df = spark.read.json("datas/user.json")
// 早期版本中,spark不能在sql中使用强类型UDAF操作
// SQL & DSL
// 早期的UDAF强类型聚合函数使用DSL语法操作
val ds: Dataset[User] = df.as[User]
// 将UDAF函数转换为查询的列对象
val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn
ds.select(udafCol).show
// TODO 关闭环境
spark.close()
}
/*
自定义聚合函数类:计算年龄的平均值
1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
IN : 输入的数据类型 User
BUF : 缓冲区的数据类型 Buff
OUT : 输出的数据类型 Long
2. 重写方法(6)
*/
case class User(username:String, age:Long)
case class Buff( var total:Long, var count:Long )
class MyAvgUDAF extends Aggregator[User, Buff, Long]{
// z & zero : 初始值或零值
// 缓冲区的初始化
override def zero: Buff = {
Buff(0L,0L)
}
// 根据输入的数据更新缓冲区的数据
override def reduce(buff: Buff, in: User): Buff = {
buff.total = buff.total + in.age
buff.count = buff.count + 1
buff
}
// 因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
}
//根据最后的结果,再执行具体的业务计算逻辑
override def finish(buff: Buff): Long = {
buff.total / buff.count
}
// 缓冲区的编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
// 输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
2.8 数据的加载和保存
2.8.1 通用的加载和保存方式
2.8.2 Parquet
2.8.3 JSON
2.8.4 CSV
2.8.5 MySQL
1)导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2)读取数据
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式 1:通用的 load 方法读取
spark.read.format("jdbc")
.option("url", "jdbc:mysql://linux1:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123123")
.option("dbtable", "user")
.load().show
//方式 2:通用的 load 方法读取 参数另一种形式
spark.read.format("jdbc")
.options(Map("url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=
123123",
"dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
//方式 3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql",
"user", props)
df.show
//释放资源
spark.stop()
3)写入数据
case class User2(name: String, age: Long)
。。。
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20),
User2("zs", 30)))
val ds: Dataset[User2] = rdd.toDS
//方式 1:通用的方式 format 指定写出类型
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://linux1:3306/spark-sql")
.option("user", "root")
.option("password", "123123")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql",
"user", props)
//释放资源
spark.stop()
2.8.6 Hive
1)内嵌的 HIVE
2)外部的 HIVE
5)代码操作 Hive
1)导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2)将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现
//创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("sql")
.getOrCreate()
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object Spark05_SparkSQL_Hive {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
/启用Hive的支持
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
// 使用SparkSQL连接外置的Hive
// 1. 拷贝Hive-size.xml文件到classpath下
// 2. 启用Hive的支持
// 3. 增加对应的依赖关系(包含MySQL驱动)
spark.sql("show tables").show
// TODO 关闭环境
spark.close()
}
}
第3章 SparkSQL 项目实战
3.1 数据准备
我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。
一共有 3 张表: 1 张用户行为表,1 张城市表,1 张产品表
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object Spark06_SparkSQL_Test {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("use atguigu")
// 准备数据
//多行写sql的方式,在hive中创建表
spark.sql(
"""
|CREATE TABLE `user_visit_action`(
| `date` string,
| `user_id` bigint,
| `session_id` string,
| `page_id` bigint,
| `action_time` string,
| `search_keyword` string,
| `click_category_id` bigint,
| `click_product_id` bigint,
| `order_category_ids` string,
| `order_product_ids` string,
| `pay_category_ids` string,
| `pay_product_ids` string,
| `city_id` bigint)
|row format delimited fields terminated by '\t'
""".stripMargin)
//加载本地数据
spark.sql(
"""
|load data local inpath 'datas/user_visit_action.txt' into table atguigu.user_visit_action
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `product_info`(
| `product_id` bigint,
| `product_name` string,
| `extend_info` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath 'datas/product_info.txt' into table atguigu.product_info
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `city_info`(
| `city_id` bigint,
| `city_name` string,
| `area` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath 'datas/city_info.txt' into table atguigu.city_info
""".stripMargin)
spark.sql("""select * from city_info""").show
spark.close()
}
}
3.2 需求:各区域热门商品 Top3
3.2.1 需求简介
3.2.2 需求分析
3.2.3 功能实现
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object Spark06_SparkSQL_Test2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("use atguigu")
// 查询基本数据
spark.sql(
"""
| select
| a.*,
| p.product_name,
| c.area,
| c.city_name
| from user_visit_action a
| join product_info p on a.click_product_id = p.product_id
| join city_info c on a.city_id = c.city_id
| where a.click_product_id > -1
""".stripMargin).createOrReplaceTempView("t1")
// 根据区域,商品进行数据聚合
spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
spark.sql(
"""
| select
| area,
| product_name,
| count(*) as clickCnt,
| cityRemark(city_name) as city_remark
| from t1 group by area, product_name
""".stripMargin).createOrReplaceTempView("t2")
// 区域内对点击数量进行排行
spark.sql(
"""
| select
| *,
| rank() over( partition by area order by clickCnt desc ) as rank
| from t2
""".stripMargin).createOrReplaceTempView("t3")
// 取前3名
spark.sql(
"""
| select
| *
| from t3 where rank <= 3
""".stripMargin).show(false)
spark.close()
}
case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )
// 自定义聚合函数:实现城市备注功能
// 1. 继承Aggregator, 定义泛型
// IN : 城市名称
// BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
// OUT : 备注信息
// 2. 重写方法(6)
class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
// 缓冲区初始化
override def zero: Buffer = {
Buffer(0, mutable.Map[String, Long]())
}
// 更新缓冲区数据
override def reduce(buff: Buffer, city: String): Buffer = {
buff.total += 1
val newCount = buff.cityMap.getOrElse(city, 0L) + 1
buff.cityMap.update(city, newCount)
buff
}
//因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
buff1.total += buff2.total
val map1 = buff1.cityMap
val map2 = buff2.cityMap
// 两个Map的合并操作方法1
// buff1.cityMap = map1.foldLeft(map2) {
// case ( map, (city, cnt) ) => {
// val newCount = map.getOrElse(city, 0L) + cnt
// map.update(city, newCount)
// map
// }
// }
// 两个Map的合并操作方法2
map2.foreach{
case (city , cnt) => {
val newCount = map1.getOrElse(city, 0L) + cnt
map1.update(city, newCount)
}
}
buff1.cityMap = map1
buff1
}
根据最后的结果,再执行具体的业务计算逻辑: 将统计的结果生成字符串信息
override def finish(buff: Buffer): String = {
val remarkList = ListBuffer[String]()
val totalcnt = buff.total
val cityMap = buff.cityMap
// 降序排列
val cityCntList = cityMap.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(2)
val hasMore = cityMap.size > 2
var rsum = 0L
cityCntList.foreach{
case ( city, cnt ) => {
val r = cnt * 100 / totalcnt
remarkList.append(s"${city} ${r}%")
rsum += r
}
}
if ( hasMore ) {
remarkList.append(s"其他 ${100 - rsum}%")
}
remarkList.mkString(", ")
}
override def bufferEncoder: Encoder[Buffer] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}