天天看点

1.SparkSQL基础—Spark SQL概述、Spark SQL核心编程—DataFrame、DataSet第1章 Spark SQL概述第2章 Spark SQL核心编程

本文目录如下:

  • 第1章 Spark SQL概述
    • 1.1 DataFrame(数据帧)简介
    • 1.2 DataSet(数据集)简介
  • 第2章 Spark SQL核心编程
    • 2.1 新的起点
    • 2.2 **`DataFrame`**(重要)
      • 2.2.1 创建 DataFrame
        • 2.2.1.1 从 Spark 数据源进行创建
        • 2.2.1.2 从 RDD 进行转换
        • 2.2.1.3 从 Hive Table 进行查询返回
      • 2.2.2 SQL 语法
      • 2.2.3 DSL 语法(了解)
      • 2.2.4 RDD 转换为 DataFrame
      • 2.2.5 DataFrame 转换为 RDD
    • 2.3 DataSet(重要)
      • 2.3.1 创建 DataSet
      • 2.3.2 RDD 转换为 DataSet
      • 2.3.3 DataSet 转换为 RDD
    • 2.4 DataFrame 和 DataSet 转换
      • 2.4.1 DataFrame 转换为 DataSet
      • 2.4.2 DataSet 转换为 DataFrame
    • 2.5 RDD、DataFrame、DataSet 三者的关系
      • 2.5.1 三者的区别
        • 2.5.1.1 RDD
        • 2.5.1.2 DataFrame
        • 2.5.1.3 DataSet

第1章 Spark SQL概述

  • Spark SQL

    Spark

    用于结构化数据(structured data)处理的

    Spark

    模块。
  • SparkSQL

    可以简化

    RDD

    的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是

    SparkSQL

    Spark SQL

    为了简化

    RDD

    的开发,提高开发效率,提供了 2 个编程抽象,类似

    Spark Core

    中的

    RDD

    :

    ➢ (1).

    DataFrame

    ➢ (2).

    DataSet

1.1 DataFrame(数据帧)简介

  • Spark

    中,

    DataFrame

    是一种以

    RDD

    为基础的分布式数据集,类似于传统数据库中的二维表格。

    DataFrame

    RDD

    的主要区别在于,前者带有

    schema

    元信息,即

    DataFrame

    所表示的二维表数据集的每一列都带有名称和类型。
  • DataFrame

    是为数据提供了

    Schema

    的视图。可以把它当做数据库中的一张表来对待。
1.SparkSQL基础—Spark SQL概述、Spark SQL核心编程—DataFrame、DataSet第1章 Spark SQL概述第2章 Spark SQL核心编程

上图直观地体现了

DataFrame

RDD

的区别。

1.2 DataSet(数据集)简介

DataSet

是分布式数据集合。

DataSet

DataFrame

的一个扩展。它提供了 RDD 的优势(强类型,使用强大的

lambda

函数的能力)以及

Spark SQL

优化执行引擎的优点。

DataSet

也可以使用功能性的转换(操作

map

flatMap

filter

等等)。

  • DataSet

    DataFrame API

    的一个扩展,是

    SparkSQL

    最新的数据抽象。
  • 用户友好的

    API

    风格,既具有类型安全检查也具有

    DataFrame

    的查询优化特性。
  • 用样例类来对

    DataSet

    中定义数据的结构信息,样例类中每个属性的名称直接映射到

    DataSet

    中的字段名称。
  • DataSet

    是强类型的。比如可以有

    DataSet[Car]

    DataSet[Person]

  • DataFrame

    DataSet

    的特列,

    DataFrame=DataSet[Row]

    ,所以可以通过

    as

    方法将

    DataFrame

    转换为

    DataSet

    Row

    是一个类型,跟

    Car

    Person

    这些的类型一样,所有的表结构信息都用

    Row

    来表示。获取数据时需要指定顺序。

第2章 Spark SQL核心编程

2.1 新的起点

  • Spark Core

    中,如果想要执行应用程序,需要首先构建上下文环境对象

    SparkContext

    Spark SQL

    其实可以理解为对

    Spark Core

    的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
  • SparkSession

    是 Spark 最新的 SQL 查询起始点,实质上是

    SQLContext

    HiveContext

    的组合,所以在

    SQLContex

    HiveContext

    上可用的

    API

    SparkSession

    上同样是可以使用的。

    SparkSession

    内部封装了

    SparkContext

    ,所以计算实际上是由

    SparkContext

    完成的。当我们使用

    spark-shell

    的时候,

    spark

    框架会自动的创建一个名称叫做

    spark

    SparkSession

    对 象, 就像我们以前可以自动获取到一个

    sc

    来表示

    SparkContext

    对象一样。

2.2

DataFrame

(重要)

  • Spark SQL

    DataFrame API

    允许我们使用

    DataFrame

    而不用必须去注册临时表或者生成 SQL 表达式。

    DataFrame API

    既有

    transformation操作

    也有

    action操作

2.2.1 创建 DataFrame

在 Spark SQL 中

SparkSession

是创建

DataFrame

和执行

SQL

的入口,创建

DataFrame

有三种方式:

  • (1) 通过 Spark 的数据源进行创建;
  • (2) 从一个存在的 RDD 进行转换;
  • (3) 从 Hive Table 进行查询返回。

2.2.1.1 从 Spark 数据源进行创建

  • (1) 查看

    Spark

    支持创建文件的数据源格式。
scala> spark.read.	
# 注意:"."后面按 Tab 键,而不是回车键

csv format jdbc json load option options orc parquet schema table text textFile
           

注:

spark-shell.cmd

打开方式请参考: Spark运行模式 中第

5.2小节

  • (2) 在 spark 的

    bin/input

    目录中创建

    user.json

    文件。
{"username":"谢清照", "age":"21"}
{"username":"朱玮琦", "age":"21"}
{"username":"信鸽", "age":"20"}
           
  • (3) 读取

    json

    文件创建

    DataFrame

scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
           

注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。

  • (4) 展示结果
scala> df.show

+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|     信鸽|
+---+--------+
           

2.2.1.2 从 RDD 进行转换

  • 暂时跳过…

2.2.1.3 从 Hive Table 进行查询返回

  • 暂时跳过…

2.2.2 SQL 语法

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

  • (1) 读取

    JSON

    文件创建

    DataFrame

scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
           
  • (2) 对

    DataFrame

    创建一个临时表
  • (3) 通过

    SQL

    语句实现查询全表
scala> val sqlDF = spark.sql("select * from user")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
           
  • (4) 结果展示
scala> sqlDF.show

+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+
           

注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使 用全局临时表时需要全路径访问,如:global_temp.people。

  • (5) 对于

    DataFrame

    创建一个全局表

**注**

:由于博主使用的

Hadoop

版本与当前使用的

Spark

版本不兼容,这里报了错误。重新安装

Hadoop

环境后,又由于

Hadoop

安装路径以及

JDK

安装路径上包含空格而报错,重新安装后解决了此问题。

  • (6) 通过

    SQL

    语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+

scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+
           

2.2.3 DSL 语法(了解)

DataFrame

提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在

Scala

,

Java

,

Python

R

中使用

DSL

,使用

DSL

语法风格不必去创建临时视图了。

  • (1) 创建一个

    DataFrame

scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
           
  • (2) 查看

    DataFrame

    Schema

    信息
scala> df.printSchema

root
 |-- age: long (nullable = true)
 |-- username: string (nullable = true)
           
  • (3) 只查看 “

    username

    ” 列数据
scala> df.select("username").show()
+--------+
|username|
+--------+
|   谢清照|
|   朱玮琦|
|   小鸽子|
+--------+
           
  • (4) 查看 “

    username

    ” 列数据以及 “

    age-1

    ” 数据

    注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名。

scala> df.select($"username",$"age" - 1).show
scala> df.select('username, 'age - 1).show()
scala> df.select('username, 'age - 1 as "newage").show()

+--------+---------+
|username|(age - 1)|
+--------+---------+
|   谢清照|       20|
|   朱玮琦|       20|
|   小鸽子|       19|
+--------+---------+
           
  • (5) 查看 “

    age

    ” 小于"

    21

    " 的数据
scala> df.filter($"age"<21).show

+---+---------+
|age| username|
+---+---------+
| 20|    小鸽子|
+---+---------+
           
  • (6) 按照 “

    age

    ” 分组,查看数据条数
scala> df.groupBy("age").count.show

+---+-----+
|age|count|
+---+-----+
| 21|    2|
| 20|    1|
+---+-----+
           

2.2.4 RDD 转换为 DataFrame

  • 在 IDEA 中开发程序时,如果需要

    RDD

    DF

    或者

    DS

    之间互相操作,那么需要引入

    import spark.implicits._

  • 这里的

    spark

    不是

    Scala

    中的包名,而是创建的

    sparkSession

    对象的变量名称,所以必须先创建

    SparkSession

    对象再导入。这里的

    spark

    对象不能使用

    var

    声明,因为

    Scala

    只支持

    val

    修饰的对象的引入。
  • spark-shell

    中无需导入,自动完成此操作。
scala> val rdd = sc.textFile("input/id.txt")
scala> rdd.toDF("id").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
           

实际开发中,一般通过样例类将 RDD 转换为 DataFrame。

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF.show
+--------+---+
|    name|age|
+--------+---+
|   谢清照| 21|
|   朱玮琦| 20|
+--------+---+
           

2.2.5 DataFrame 转换为 RDD

DataFrame

其实就是对

RDD

的封装,所以可以直接获取内部的

RDD

scala> val df = sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([谢清照,21], [朱玮琦,20])
           

注意:此时得到的 RDD 存储类型为 Row。

scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30]

scala> array(0)(0)
res29: Any = zhangsan

scala> array(0).getAs[String]("name")
res30: String = 谢清照
           

2.3 DataSet(重要)

  • DataSet

    是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建 DataSet

  • (1) 使用样例类集合创建

    DataSet

scala> case class Person(name: String, age: Long)
defined class Person

scala> val list = List(Person("谢清照",21), Person("朱玮琦",20))
list: List[Person] = List(Person(谢清照,21), Person(朱玮琦,20))

scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.show
+------+---+
|  name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
           
  • (2) 使用基本类型的序列创建

    DataSet

scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+
           

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。

2.3.2 RDD 转换为 DataSet

  • SparkSQL

    能够自动将包含有

    case

    类的

    RDD

    转换成

    DataSet

    case

    类定义了

    table

    的结构,

    case

    类属性通过反射变成了表的列名。

    Case

    类可以包含诸如

    Seq

    或者

    Array

    等复杂的结构。
# 样例类
scala> case class User(name:String, age:Int)
defined class User

scala> val rdd = sc.makeRDD(List(User("谢清照", 21), User("朱玮琦", 20)))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[42] at makeRDD at <console>:26

# RDD 转换为 DataSet
scala> val ds = rdd.toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> ds.show
+------+---+
|  name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
           

2.3.3 DataSet 转换为 RDD

DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD。

# ds 为上面例子中的 DataSet
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25

scala> rdd.collect
res14: Array[User] = Array(User(谢清照,21), User(朱玮琦,20))
           

2.4 DataFrame 和 DataSet 转换

  • DataFrame

    其实是

    DataSet

    的特例,所以它们之间是可以互相转换的。

2.4.1 DataFrame 转换为 DataSet

# DataFrame
scala>  val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

# 样例类
scala> case class User(age:Long, username:String)
defined class User

# 转换为 DataSet
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]
           

2.4.2 DataSet 转换为 DataFrame

# Dataset[User] => DataFrame 

# 转换为 DataFrame, 只保留了结构,去掉了 User 的数据类型
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
           

2.5 RDD、DataFrame、DataSet 三者的关系

SparkSQL

Spark

为我们提供了两个新的抽象,分别是

DataFrame

DataSet

。他们和

RDD

有什么区别呢?首先从版本的产生上来看:

  • Spark1.0 => RDD
  • Spark1.3 => DataFrame
  • Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的

Spark

版本中,

DataSet

有可能会逐步取代

RDD

DataFrame

成为唯一的 API 接口。

2.5.1 三者的区别

2.5.1.1 RDD

  • RDD

    一般和

    spark mllib

    同时使用
  • RDD

    不支持

    sparksql

    操作

2.5.1.2 DataFrame

  • RDD

    Dataset

    不同,

    DataFrame

    每一行的类型固定为

    Row

    ,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
  • DataFrame

    DataSet

    一般不与

    spark mllib

    同时使用
  • DataFrame

    DataSet

    均支持

    SparkSQL

    的操作,比如

    select

    groupby

    之类,还能注册临时表/视窗,进行

    sql

    语句操作。
  • DataFrame

    DataSet

    支持一些特别方便的保存方式,比如保存成

    csv

    ,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)。

2.5.1.3 DataSet

  • Dataset

    DataFrame

    拥有完全相同的成员函数,区别只是每一行的数据类型不同。

    DataFrame

    其实就是

    DataSet

    的一个特例

    type DataFrame = Dataset[Row]

  • DataFrame

    也可以叫

    Dataset[Row]

    ,每一行的类型是

    Row

    ,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的

    getAS

    方法或者共性中的第七条提到的模式匹配拿出特定字段。而

    Dataset

    中,每一行是什么类型是不一定的,在自定义了

    case class

    之后可以很自由的获得每一行的信息
    1.SparkSQL基础—Spark SQL概述、Spark SQL核心编程—DataFrame、DataSet第1章 Spark SQL概述第2章 Spark SQL核心编程

声明:本文是学习时记录的笔记,如有侵权请告知删除!

原视频地址:https://www.bilibili.com/video/BV11A411L7CK

继续阅读