本文目录如下:
- 第1章 Spark SQL概述
-
- 1.1 DataFrame(数据帧)简介
- 1.2 DataSet(数据集)简介
- 第2章 Spark SQL核心编程
-
- 2.1 新的起点
- 2.2 **`DataFrame`**(重要)
-
- 2.2.1 创建 DataFrame
-
- 2.2.1.1 从 Spark 数据源进行创建
- 2.2.1.2 从 RDD 进行转换
- 2.2.1.3 从 Hive Table 进行查询返回
- 2.2.2 SQL 语法
- 2.2.3 DSL 语法(了解)
- 2.2.4 RDD 转换为 DataFrame
- 2.2.5 DataFrame 转换为 RDD
- 2.3 DataSet(重要)
-
- 2.3.1 创建 DataSet
- 2.3.2 RDD 转换为 DataSet
- 2.3.3 DataSet 转换为 RDD
- 2.4 DataFrame 和 DataSet 转换
-
- 2.4.1 DataFrame 转换为 DataSet
- 2.4.2 DataSet 转换为 DataFrame
- 2.5 RDD、DataFrame、DataSet 三者的关系
-
- 2.5.1 三者的区别
-
- 2.5.1.1 RDD
- 2.5.1.2 DataFrame
- 2.5.1.3 DataSet
第1章 Spark SQL概述
-
是Spark SQL
用于结构化数据(structured data)处理的Spark
模块。Spark
-
可以简化SparkSQL
的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是RDD
。SparkSQL
为了简化Spark SQL
的开发,提高开发效率,提供了 2 个编程抽象,类似RDD
中的Spark Core
RDD
:
➢ (1).
➢ (2).DataFrame
DataSet
1.1 DataFrame(数据帧)简介
- 在
中,Spark
是一种以DataFrame
为基础的分布式数据集,类似于传统数据库中的二维表格。RDD
与DataFrame
的主要区别在于,前者带有RDD
元信息,即schema
所表示的二维表数据集的每一列都带有名称和类型。DataFrame
-
是为数据提供了DataFrame
的视图。可以把它当做数据库中的一张表来对待。Schema
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90zZOBTNXpVM41mWtZkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzIGOxUTYjJWZhNWMmJWMiZWM4QjZiNWMykDZ1ITY3M2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
上图直观地体现了
DataFrame
和
RDD
的区别。
1.2 DataSet(数据集)简介
DataSet
是分布式数据集合。
DataSet
是
DataFrame
的一个扩展。它提供了 RDD 的优势(强类型,使用强大的
lambda
函数的能力)以及
Spark SQL
优化执行引擎的优点。
DataSet
也可以使用功能性的转换(操作
map
,
flatMap
,
filter
等等)。
-
是DataSet
的一个扩展,是DataFrame API
最新的数据抽象。SparkSQL
- 用户友好的
风格,既具有类型安全检查也具有API
的查询优化特性。DataFrame
- 用样例类来对
中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet
中的字段名称。DataSet
-
是强类型的。比如可以有DataSet
,DataSet[Car]
。DataSet[Person]
-
是DataFrame
的特列,DataSet
,所以可以通过DataFrame=DataSet[Row]
方法将as
转换为DataFrame
。DataSet
是一个类型,跟Row
、Car
这些的类型一样,所有的表结构信息都用Person
来表示。获取数据时需要指定顺序。Row
第2章 Spark SQL核心编程
2.1 新的起点
-
中,如果想要执行应用程序,需要首先构建上下文环境对象Spark Core
,SparkContext
其实可以理解为对Spark SQL
的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。Spark Core
-
是 Spark 最新的 SQL 查询起始点,实质上是SparkSession
和SQLContext
的组合,所以在HiveContext
和SQLContex
上可用的HiveContext
在API
上同样是可以使用的。SparkSession
内部封装了SparkSession
,所以计算实际上是由SparkContext
完成的。当我们使用SparkContext
的时候,spark-shell
框架会自动的创建一个名称叫做spark
的spark
对 象, 就像我们以前可以自动获取到一个SparkSession
来表示sc
对象一样。SparkContext
2.2 DataFrame
(重要)
DataFrame
-
的Spark SQL
允许我们使用DataFrame API
而不用必须去注册临时表或者生成 SQL 表达式。DataFrame
既有DataFrame API
也有transformation操作
。action操作
2.2.1 创建 DataFrame
在 Spark SQL 中
SparkSession
是创建
DataFrame
和执行
SQL
的入口,创建
DataFrame
有三种方式:
- (1) 通过 Spark 的数据源进行创建;
- (2) 从一个存在的 RDD 进行转换;
- (3) 从 Hive Table 进行查询返回。
2.2.1.1 从 Spark 数据源进行创建
- (1) 查看
支持创建文件的数据源格式。Spark
scala> spark.read.
# 注意:"."后面按 Tab 键,而不是回车键
csv format jdbc json load option options orc parquet schema table text textFile
注:
spark-shell.cmd
打开方式请参考: Spark运行模式 中第
5.2小节
。
- (2) 在 spark 的
目录中创建bin/input
文件。user.json
{"username":"谢清照", "age":"21"}
{"username":"朱玮琦", "age":"21"}
{"username":"信鸽", "age":"20"}
- (3) 读取
文件创建json
。DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。
- (4) 展示结果
scala> df.show
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 信鸽|
+---+--------+
2.2.1.2 从 RDD 进行转换
- 暂时跳过…
2.2.1.3 从 Hive Table 进行查询返回
- 暂时跳过…
2.2.2 SQL 语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
- (1) 读取
文件创建JSON
DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
- (2) 对
创建一个临时表DataFrame
- (3) 通过
语句实现查询全表SQL
scala> val sqlDF = spark.sql("select * from user")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- (4) 结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使 用全局临时表时需要全路径访问,如:global_temp.people。
- (5) 对于
创建一个全局表DataFrame
**注**
:由于博主使用的
Hadoop
版本与当前使用的
Spark
版本不兼容,这里报了错误。重新安装
Hadoop
环境后,又由于
Hadoop
安装路径以及
JDK
安装路径上包含空格而报错,重新安装后解决了此问题。
- (6) 通过
语句实现查询全表SQL
scala> spark.sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
2.2.3 DSL 语法(了解)
DataFrame
提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在
Scala
,
Java
,
Python
和
R
中使用
DSL
,使用
DSL
语法风格不必去创建临时视图了。
- (1) 创建一个
DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- (2) 查看
的DataFrame
信息Schema
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- username: string (nullable = true)
- (3) 只查看 “
” 列数据username
scala> df.select("username").show()
+--------+
|username|
+--------+
| 谢清照|
| 朱玮琦|
| 小鸽子|
+--------+
- (4) 查看 “
” 列数据以及 “username
” 数据age-1
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名。
scala> df.select($"username",$"age" - 1).show
scala> df.select('username, 'age - 1).show()
scala> df.select('username, 'age - 1 as "newage").show()
+--------+---------+
|username|(age - 1)|
+--------+---------+
| 谢清照| 20|
| 朱玮琦| 20|
| 小鸽子| 19|
+--------+---------+
- (5) 查看 “
” 小于"age
" 的数据21
scala> df.filter($"age"<21).show
+---+---------+
|age| username|
+---+---------+
| 20| 小鸽子|
+---+---------+
- (6) 按照 “
” 分组,查看数据条数age
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 21| 2|
| 20| 1|
+---+-----+
2.2.4 RDD 转换为 DataFrame
- 在 IDEA 中开发程序时,如果需要
与RDD
或者DF
之间互相操作,那么需要引入DS
import spark.implicits._
- 这里的
不是spark
中的包名,而是创建的Scala
对象的变量名称,所以必须先创建sparkSession
对象再导入。这里的SparkSession
对象不能使用spark
声明,因为var
只支持Scala
修饰的对象的引入。val
-
中无需导入,自动完成此操作。spark-shell
scala> val rdd = sc.textFile("input/id.txt")
scala> rdd.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
实际开发中,一般通过样例类将 RDD 转换为 DataFrame。
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF.show
+--------+---+
| name|age|
+--------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+--------+---+
2.2.5 DataFrame 转换为 RDD
DataFrame
其实就是对
RDD
的封装,所以可以直接获取内部的
RDD
。
scala> val df = sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25
scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([谢清照,21], [朱玮琦,20])
注意:此时得到的 RDD 存储类型为 Row。
scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30]
scala> array(0)(0)
res29: Any = zhangsan
scala> array(0).getAs[String]("name")
res30: String = 谢清照
2.3 DataSet(重要)
-
是具有强类型的数据集合,需要提供对应的类型信息。DataSet
2.3.1 创建 DataSet
- (1) 使用样例类集合创建
DataSet
scala> case class Person(name: String, age: Long)
defined class Person
scala> val list = List(Person("谢清照",21), Person("朱玮琦",20))
list: List[Person] = List(Person(谢清照,21), Person(朱玮琦,20))
scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> ds.show
+------+---+
| name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
- (2) 使用基本类型的序列创建
DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。
2.3.2 RDD 转换为 DataSet
-
能够自动将包含有SparkSQL
类的case
转换成RDD
,DataSet
类定义了case
的结构,table
类属性通过反射变成了表的列名。case
类可以包含诸如Case
或者Seq
等复杂的结构。Array
# 样例类
scala> case class User(name:String, age:Int)
defined class User
scala> val rdd = sc.makeRDD(List(User("谢清照", 21), User("朱玮琦", 20)))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[42] at makeRDD at <console>:26
# RDD 转换为 DataSet
scala> val ds = rdd.toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> ds.show
+------+---+
| name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
2.3.3 DataSet 转换为 RDD
DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD。
# ds 为上面例子中的 DataSet
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25
scala> rdd.collect
res14: Array[User] = Array(User(谢清照,21), User(朱玮琦,20))
2.4 DataFrame 和 DataSet 转换
-
其实是DataFrame
的特例,所以它们之间是可以互相转换的。DataSet
2.4.1 DataFrame 转换为 DataSet
# DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
# 样例类
scala> case class User(age:Long, username:String)
defined class User
# 转换为 DataSet
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]
2.4.2 DataSet 转换为 DataFrame
# Dataset[User] => DataFrame
# 转换为 DataFrame, 只保留了结构,去掉了 User 的数据类型
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
2.5 RDD、DataFrame、DataSet 三者的关系
在
SparkSQL
中
Spark
为我们提供了两个新的抽象,分别是
DataFrame
和
DataSet
。他们和
RDD
有什么区别呢?首先从版本的产生上来看:
- Spark1.0 => RDD
- Spark1.3 => DataFrame
- Spark1.6 => Dataset
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的
Spark
版本中,
DataSet
有可能会逐步取代
RDD
和
DataFrame
成为唯一的 API 接口。
2.5.1 三者的区别
2.5.1.1 RDD
-
一般和RDD
同时使用spark mllib
-
不支持RDD
操作sparksql
2.5.1.2 DataFrame
- 与
和RDD
不同,Dataset
每一行的类型固定为DataFrame
,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。Row
-
与DataFrame
一般不与DataSet
同时使用spark mllib
-
与DataFrame
均支持DataSet
的操作,比如SparkSQL
,select
之类,还能注册临时表/视窗,进行groupby
语句操作。sql
-
与DataFrame
支持一些特别方便的保存方式,比如保存成DataSet
,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)。csv
2.5.1.3 DataSet
-
和Dataset
拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame
其实就是DataFrame
的一个特例DataSet
。type DataFrame = Dataset[Row]
-
也可以叫DataFrame
,每一行的类型是Dataset[Row]
,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的Row
方法或者共性中的第七条提到的模式匹配拿出特定字段。而getAS
中,每一行是什么类型是不一定的,在自定义了Dataset
之后可以很自由的获得每一行的信息case class
1.SparkSQL基础—Spark SQL概述、Spark SQL核心编程—DataFrame、DataSet第1章 Spark SQL概述第2章 Spark SQL核心编程
声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK