Apache Spark SQL
Spark SQL是⽤于结构化数据处理的⼀个模块。同Spark RDD 不同地⽅在于Spark SQL的API可以给Spark计算引擎提供更多地 信息,例如:数据结构、计算算⼦等。在内部Spark可以通过这些信息有针对对任务做优化和调整。这⾥有⼏种⽅式和Spark SQL进⾏交互,例如Dataset API和SQL等,这两种API可以混合使⽤。Spark SQL的⼀个⽤途是执⾏SQL查询。 Spark SQL还可⽤于从现有Hive安装中读取数据。从其他编程语⾔中运⾏SQL时,结果将作为Dataset/DataFrame返回,使⽤命令 ⾏或JDBC / ODBC与SQL接⼝进⾏交互。
Dataset是⼀个分布式数据集合在Spark 1.6提供⼀个新的接⼝,Dataset提供RDD的优势(强类型,使⽤强⼤的lambda函 数)以及具备了Spark SQL执⾏引擎的优点。Dataset可以通过JVM对象构建,然后可以使⽤转换函数等(例如:map、flatMap、filter等),⽬前Dataset API⽀持Scala和Java ⽬前Python对Dataset⽀持还不算完备。
DataFrame是命名列的数据集,他在概念是等价于关系型数据库。DataFrames可以从很多地⽅构建,⽐
如说结构化数据⽂ 件、hive中的表或者外部数据库,使⽤Dataset[row]的数据集,可以理解DataFrame就是⼀个Dataset[Row]
一、SparkSession
Spark中所有功能的⼊⼝点是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder():
1、依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
2、Drvier程序
//1.创建SparkSession
val spark = SparkSession.builder()
.appName("hellosql")
.master("local[10]") .getOrCreate()
//2.引⼊该隐试转换 主要是 将 RDD 转换为 DataFrame/Dataset
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
//3.创建Dataset/DataFrame
//4.sparkSQL提供的算子或脚本
//5.将SQL结果写入到外围系统
//6.关闭spark
spark.stop()
二、创建Dataset/DataFrame
1、Dataset
Dataset与RDD类似,但是它们不使⽤Java序列化或Kryo,⽽是使⽤专⽤的Encoder来序列化对象以便通过⽹络进⾏处理或传输。虽然Encoder和标准序列化都负责将对象转换为字节,但Encoder是动态⽣成的代码,并使⽤⼀种格式,允许Spark执⾏许多操作,如过滤,排序和散列,⽽⽆需将字节反序列化为对
象。
①、case-class
case class Person(id:Int,name:String,age:Int,sex:Boolean)
val dataset: Dataset[Person] =List(Person(1,"zhangsan",18,true),Person(2,"wangwu",28,true)).toDS()
dataset.select($"id",$"name").show()
②、Tuple元组
val dataset: Dataset[(Int,String,Int,Boolean)] = List((1,"zhangsan",18,true),
(2,"wangwu",28,true)).toDS()
dataset.select($"_1",$"_2").show()
或者
dataset.selectExpr("_1 as id","_2 as name","(_3 * 10) as age").show()
③、json数据
{"name":"张三","age":18}
{"name":"lisi","age":28}
{"name":"wangwu","age":38}
case class Person(id:Int,name:String,age:Int,sex:Boolean)
val dataset = spark.read.json("D:///Persion.json").as[Person]
dataset.show()
④、rdd
a、元组
val userRDD = spark.sparkContext.makeRDD(List((1,"张三",true,18,15000.0)))
userRDD.toDS().show()
+---+----+----+---+-------+
| _1| _2| _3| _4| _5|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
b、case-class
val userRDD = spark.sparkContext.makeRDD(List(User(1,"张三",true,18,15000.0)))
userRDD.toDS().show()
+---+----+----+---+-------+
| id|name| sex|age| salary|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
2、DataFrame
Data Frame是命名列的数据集,他在概念是等价于关系型数据库。DataFrames可以从很多地⽅构建,⽐如说结构化数据⽂ 件、hive中的表或者外部数据库,使⽤Dataset[row]的数据集,可以理解DataFrame就是⼀个Dataset[Row].
①、json⽂件
val frame = spark.read.json("file:///f:/person.json")
frame.show()
②、case-class
List(Person("zhangsan",18),Person("王五",20)).toDF("uname","uage").show()
③、Tuple元组
List(("zhangsan",18),("王五",20)).toDF("name","age").show()
④、RDD转换
a、Row
val userRDD = spark.sparkContext.makeRDD(List((1,"张三",true,18,15000.0)))
.map(t=>Row(t._1,t._2,t._3,t._4,t._5))
var schema=new StructType()
.add("id","int")
.add("name","string")
.add("sex","boolean")
.add("age","int")
.add("salary","double")
spark.createDataFrame(userRDD,schema).show()
+---+----+----+---+-------+
| id|name| sex|age| salary|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
b、Javabean
val userRDD = spark.sparkContext.makeRDD(List(new User(1,"张三",true,18,15000.0))) spark.createDataFrame(userRDD,classOf[User]).show()
提示 :这⾥的 User 须是JavaBean对象。如果是Scala的类,⽤户需要额外提供getXxx⽅法(没这个必要)
+---+----+----+---+-------+
| id|name| sex|age| salary|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
c、case-class
val userRDD = spark.sparkContext.makeRDD(List(User(1,"张三",true,18,15000.0)))
spark.createDataFrame(userRDD).show()
+---+----+----+---+-------+
| id|name| sex|age| salary|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
d、tuple元组
val userRDD = spark.sparkContext.makeRDD(List((1,"张三",true,18,15000.0)))
spark.createDataFrame(userRDD).show()
+---+----+----+---+-------+
| _1| _2| _3| _4| _5|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
三、Dataset/DataFrame API操作
准备数据
1,Michael,false,29,2000
5,Lisa,false,19,1000
3,Justin,true,19,1000
2,Andy,true,30,5000
4,Kaine,false,20,5000
尝试将⽂本数据转变为DataFrame
case class User01(id:Int,name:String,sex:Boolean,age:Int,salary:Double)
var userRDD:RDD[User01]=userlines.map(line=>line.split(","))
.map(ts=>User01(ts(0).toInt,ts(1),ts(2).toBoolean,ts(3).toInt,ts(4).toDouble))
val userDataFrame = userRDD.toDF()
1、printSchema
打印创建的表结构信息
userDataFrame.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- sex: boolean (nullable = false)
|-- age: integer (nullable = false)
|-- salary: double (nullable = false)
2、show
默认将dataframe或者是dataset中前20⾏的数据打印在控制台,⼀般⽤于测试。
userDataFrame.show()
+---+-------+-----+---+------+
| id| name| sex|age|salary|
+---+-------+-----+---+------+
| 1|Michael|false| 29|2000.0|
| 2| Andy| true| 30|5000.0|
| 3| Justin| true| 19|1000.0|
| 4| Kaine|false| 20|5000.0|
| 5| Lisa|false| 19|1000.0|
+---+-------+-----+---+------+
例如只查询前2⾏ userDataFrame.show(2)
+---+-------+-----+---+------+
| id| name| sex|age|salary|
+---+-------+-----+---+------+
| 1|Michael|false| 29|2000.0|
| 2| Andy| true| 30|5000.0|
+---+-------+-----+---+------+
3、select
等价于sql脚本的select语句,⽤于过滤、投影出需要的字段信息。⽤户可以直接给列名,但是不⽀持计
算
userDataFrame.select("id","name","sex","age","salary").show()
+---+-------+-----+---+------+
| id| name| sex|age|salary|
+---+-------+-----+---+------+
| 1|Michael|false| 29|2000.0|
| 2| Andy| true| 30|5000.0|
| 3| Justin| true| 19|1000.0|
| 4| Kaine|false| 20|5000.0|
| 5| Lisa|false| 19|1000.0|
+---+-------+-----+---+------+
⽤户可以给select传递Cloumn,这样⽤户可以针对Column做⼀些简单的计算
userDataFrame.select(new Column("id"),new Column("name"),new Column("age"),new Column("salary"),new Column("salary").*(12)).show()
简化写法
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12) .show()
+---+-------+---+------+-------------+
| id| name|age|salary|(salary * 12)|
+---+-------+---+------+-------------+
| 1|Michael| 29|2000.0| 24000.0|
| 2| Andy| 30|5000.0| 60000.0|
| 3| Justin| 19|1000.0| 12000.0|
| 4| Kaine| 20|5000.0| 60000.0|
| 5| Lisa| 19|1000.0| 12000.0|
+---+-------+---+------+-------------+
4、selectExpr
允许直接给字段名,并且基于字段名指定⼀些常⻅字符串SQL运算符。
userDataFrame.selectExpr("id","name || '⽤户'","salary * 12 as annal_salary").show()
+---+------------------+------------+
| id|concat(name, ⽤户)|annal_salary|
+---+------------------+------------+
| 1 | Michael⽤户| 24000.0|
| 2 | Andy⽤户| 60000.0|
| 3 | Justin⽤户| 12000.0|
| 4 | Kaine⽤户| 60000.0|
| 5 | Lisa⽤户| 12000.0|
+---+------------------+------------+
5、where
类似SQL中的where,主要⽤于过滤查询结果。该算⼦可以传递Conditiion或者ConditionExp
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12)
.where($"name" like "%a%")
.show()
等价写法
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12).where("name like '%a%'") .show()
注意spark中别名不要出现中⽂,如果出现中⽂,在 where表达式 中存在bug
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12 as "annal_salary")
.where("(name like '%a%') and (annal_salary > 12000)" )
.show() //正常
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12 as "年薪")
.where("(name like '%a%') and ('年薪' > 12000)" )
.show()//错误
userDataFrame.select($"id",$"name",$"age",$"salary",$"salary" * 12 as "年薪")
.where($"name" like "%a%" and $"年薪" > 12000 )
.show() //正常
6、withColumn
可以给dataframe添加⼀个字段信息
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex")
.withColumn("年薪",$"salary" * 12)
.show()
+---+-------+---+------+-----+-------+
| id| name|age|salary| sex| 年薪|
+---+-------+---+------+-----+-------+
| 1|Michael| 29|2000.0|false|24000.0|
| 2| Andy| 30|5000.0| true|60000.0|
| 3| Justin| 19|1000.0| true|12000.0|
| 4| Kaine| 20|5000.0|false|60000.0|
| 5| Lisa| 19|1000.0|false|12000.0|
+---+-------+---+------+-----+-------+
7、withColumnRenamed
修改现有字段名字
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex")
.withColumn("年薪",$"salary" * 12)
.withColumnRenamed("年薪","annal_salary")
.withColumnRenamed("id","uid")
.show()
+---+-------+---+------+-----+------------+
|uid| name|age|salary| sex|annal_salary|
+---+-------+---+------+-----+------------+
| 1|Michael| 29|2000.0|false| 24000.0|
| 2| Andy| 30|5000.0| true| 60000.0|
| 3| Justin| 19|1000.0| true| 12000.0|
| 4| Kaine| 20|5000.0|false| 60000.0|
| 5| Lisa| 19|1000.0|false| 12000.0|
+---+-------+---+------+-----+------------+
8、groupBy
和SQL中的 group by ⽤法⼀直,通常和⼀些聚合函数⼀起使⽤。
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex") .groupBy($"sex") .mean("salary")//计算平均值等价avg
.show()
+-----+------------------+
| sex| avg(salary)|
+-----+------------------+
| true| 3000.0|
|false|2666.6666666666665|
+-----+------------------+
类似还有max、min、sum、avg算⼦,但是如果使⽤算⼦,后⾯跟⼀个聚合函数。⼀般来讲⽤户可以使
⽤ agg 算⼦实现多个聚合操作。
9、agg
必须跟在groupBy后⾯,调⽤多个聚合函数,实现对某些字段的求和、最⼤值、最⼩值、平均值等。
import org.apache.spark.sql.functions._
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex") .groupBy($"sex") .agg(sum("salary") as "sum", avg("salary") as "avg",max("salary") as
"max",min("salary") as "min") .show()
+-----+------+------------------+------+------+
| sex| sum| avg| max| min|
+-----+------+------------------+------+------+
| true|6000.0| 3000.0|5000.0|1000.0|
|false|8000.0|2666.6666666666665|5000.0|1000.0|
+-----+------+------------------+------+------+
或者
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex")
.groupBy($"sex")
.agg("salary"->"sum","salary"->"avg","salary"->"max","salary"->"min")
.show()
10、开窗函数
使⽤over完成开窗,操作。
import org.apache.spark.sql.functions._
val w = Window.partitionBy("sex")
.orderBy($"salary" desc)
.rowsBetween(Window.unboundedPreceding,Window.currentRow)
userDataFrame.select($"id",$"name",$"age",$"salary",$"sex") .withColumn("salary_rank",dense_rank() over (w ))
.show()
select id,name,...,dense_rank() over(partition by sex order by salary desc rows
between unbounded preceding and current row) from t_user
11、cube
实现多维度分析计算
import org.apache.spark.sql.functions._
spark.sparkContext.makeRDD(List((110,50,80),(120,60,95),(120,50,96)))
.toDF("height","weight","score") .cube($"height",$"weight") .agg(avg("score"),max("score"))
.show()
+------+------+-----------------+----------+
|height|weight| avg(score)|max(score)|
+------+------+-----------------+----------+
| 110| 50| 80.0| 80|
| 120| null| 95.5| 96|
| 120| 60| 95.0| 95|
| null| 60| 95.0| 95|
| null| null|90.33333333333333| 96|
| 120| 50| 96.0| 96|
| 110| null| 80.0| 80|
| null| 50| 88.0| 96|
+------+------+-----------------+----------+
12、pivot
该算⼦引⾃于SqlServer,主要⽤于实现⾏转列操作。
case class UserCost(id:Int,category:String,cost:Double)
var userCostRDD=spark.sparkContext.parallelize(List(
UserCost(1,"电⼦类",100),
UserCost(1,"电⼦类",20),
UserCost(1,"⺟婴类",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"美⻝",79),
UserCost(2,"电⼦类",80),
UserCost(2,"⽣活⽤品",100)
))
var categories=userCostRDD.map(uc=>uc.category).distinct.collect()
userCostRDD.toDF("id","category","cost") .groupBy("id") .pivot($"category",categories) .sum("cost") .show()
+---+------+--------+------+----+
| id|⺟婴类|⽣活⽤品|电⼦类|美⻝|
+---+------+--------+------+----+
| 1| 100.0| 100.0| 120.0|null|
| 2| null| 100.0| 80.0|79.0|
+---+------+--------+------+----+
13、na
提供了对null值字段数据的⾃动填充技术。
case class User01(id:Int,name:String,sex:Boolean,age:Int,salary:Double)
case class UserCost(id:Int,category:String,cost:Double)
var userCostRDD=spark.sparkContext.parallelize(List(
UserCost(1,"电⼦类",100),
UserCost(1,"电⼦类",20),
UserCost(1,"⺟婴类",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"美⻝",79),
UserCost(2,"电⼦类",80),
UserCost(2,"⽣活⽤品",100)
))
var categories=userCostRDD.map(uc=>uc.category).distinct.collect()
userCostRDD.toDF("id","category","cost") .groupBy("id") .pivot($"category",categories) .sum("cost") .na.fill(0.0) .show()
+---+------+--------+------+----+
| id|⺟婴类|⽣活⽤品|电⼦类|美⻝|
+---+------+--------+------+----+
| 1| 100.0| 100.0| 120.0| 0.0|
| 2| 0.0| 100.0| 80.0|79.0|
+---+------+--------+------+----+
其中fill表示填充。
var userCostRDD=spark.sparkContext.parallelize(List(
UserCost(1,"电⼦类",100),
UserCost(1,"电⼦类",20),
UserCost(1,"⺟婴类",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"美⻝",79),
UserCost(2,"电⼦类",80),
UserCost(2,"⽣活⽤品",100)
))
var categories=userCostRDD.map(uc=>uc.category).distinct.collect()
userCostRDD.toDF("id","category","cost") .groupBy("id") .pivot($"category",categories) .sum("cost") .na.fill(Map("美⻝"-> -1,"⺟婴类"-> 1000))
.show()
+---+------+--------+------+----+
| id|⺟婴类|⽣活⽤品|电⼦类|美⻝|
+---+------+--------+------+----+
| 1| 100.0| 100.0| 120.0|-1.0|
| 2|1000.0| 100.0| 80.0|79.0|
+---+------+--------+------+----+
⼀般na后⾯还可以跟drop算⼦,可以删除⼀些null值的⾏
var userCostRDD=spark.sparkContext.parallelize(List(
UserCost(1,"电⼦类",100),
UserCost(1,"电⼦类",20),
UserCost(1,"⺟婴类",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"美⻝",79),
UserCost(2,"电⼦类",80),
UserCost(2,"⽣活⽤品",100)
))
var categories=userCostRDD.map(uc=>uc.category).distinct.collect()
userCostRDD.toDF("id","category","cost") .groupBy("id") .pivot($"category",categories) .sum("cost")
//.na.drop(4)//如果少于四个⾮空,删除
// .na.drop("any")//只要有⼀个为null,就删除,`all`都为null才删除
.na.drop(List("美⻝","⺟婴类"))//如果指定列出null、删除
.show()
14、join
和数据的join类似。
case class User01(id:Int,name:String,sex:Boolean,age:Int,salary:Double)
case class UserCost(id:Int,category:String,cost:Double)
var userCostRDD=spark.sparkContext.parallelize(List(
UserCost(1,"电脑配件",100),
UserCost(1,"⺟婴⽤品",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"居家美⻝",79),
UserCost(2,"消费电⼦",80),
UserCost(2,"⽣活⽤品",100)
))
var userRDD=spark.sparkContext.parallelize(List(
User01(1,"张晓三",true,18,15000),
User01(2,"李晓四",true,18,18000),
User01(3,"王晓五",false,18,10000)
))
val categories = userCostRDD.map(_.category).distinct().collect()
userCostRDD.toDF("id","category","cost").groupBy("id") .pivot($"category",categories) .sum("cost") .join(userRDD.toDF("id","name","sex","age","salary"),"id") .na.fill(0.0) .show()
userCostRDD.toDF("id","category","cost").as("c").groupBy("id")
.pivot($"category",categories)
.sum("cost")
.join(userRDD.toDF("id","name","sex","age","salary").as("u"),$"c.id"===$"u.id","LEFT_O
UTER")
.na.fill(0.0)
.show()
15、Duplicates
删除记录中的重复记录,类似sql中distinct关键字
var userCostDF=spark.sparkContext.parallelize(List(
UserCost(1,"电脑配件",100),
UserCost(1,"⺟婴⽤品",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"居家美⻝",79),
UserCost(2,"消费电⼦",80),
UserCost(2,"⽣活⽤品",100)
)).toDF()
userCostDF.dropDuplicates().show()
必须所有字段出现重复,才会删除重复记录。
+---+--------+-----+
| id|category| cost|
+---+--------+-----+
| 2|居家美⻝| 79.0|
| 1|电脑配件|100.0|
| 1|⽣活⽤品|100.0|
| 2|⽣活⽤品|100.0|
| 1|⺟婴⽤品|100.0|
| 2|消费电⼦| 80.0|
+---+--------+-----+
例如针对于category去重
userCostDF.dropDuplicates("category").show()
+---+--------+-----+
| id|category| cost|
+---+--------+-----+
| 2|居家美⻝| 79.0|
| 1|⺟婴⽤品|100.0|
| 1|⽣活⽤品|100.0|
| 2|消费电⼦| 80.0|
| 1|电脑配件|100.0|
+---+--------+-----+
16、drop
删除指定列信息
var userCostDF=spark.sparkContext.parallelize(List(
UserCost(1,"电脑配件",100),
UserCost(1,"⺟婴⽤品",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"居家美⻝",79),
UserCost(2,"消费电⼦",80),
UserCost(2,"⽣活⽤品",100)
)).toDF()
userCostDF.drop("cost","id").dropDuplicates().show()
+--------+
|category|
+--------+
|居家美⻝| |⺟婴⽤品| |⽣活⽤品| |消费电⼦| |电脑配件|
+--------+
17、orderBy
类似SQL中的orderBy⽤于指定排序字段
val df=spark.sparkContext.parallelize(List((1,"TV,GAME"),
(2,"SLEEP,FOOTBALL"))).toDF("id","hobbies")
df.orderBy($"id" asc) .show()
+---+--------------+
| id| hobbies|
+---+--------------+
| 1| TV,GAME|
| 2|SLEEP,FOOTBALL|
+---+--------------+
18、limit
类似于数据库的分⻚语句,但是只能限定条数,类似RDD中take(n)
var userCostDF=spark.sparkContext.parallelize(List(
UserCost(1,"电脑配件",100),
UserCost(1,"⺟婴⽤品",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"居家美⻝",79),
UserCost(2,"消费电⼦",80),
UserCost(2,"⽣活⽤品",100)
)).toDF()
userCostDF.orderBy($"id" asc).limit(3).show()
+---+--------+-----+
| id|category| cost|
+---+--------+-----+
| 1|电脑配件|100.0|
| 1|⺟婴⽤品|100.0|
| 1|⽣活⽤品|100.0|
+---+--------+-----+
19、filter
类似于where,过滤掉⼀些不符合要求的数据集,⽤户可以给表达式、过滤条件、或者是函数
var userCostDF=spark.sparkContext.parallelize(List(
UserCost(1,"电脑配件",100),
UserCost(1,"⺟婴⽤品",100),
UserCost(1,"⽣活⽤品",100),
UserCost(2,"居家美⻝",79),
UserCost(2,"消费电⼦",80),
UserCost(2,"⽣活⽤品",100)
)).toDF()
userCostDF.orderBy($"id" asc).limit(3).filter("category != '⺟婴⽤品' ").show()
+---+--------+-----+
| id|category| cost|
+---+--------+-----+
| 1|电脑配件|100.0|
| 1|⽣活⽤品|100.0|
+---+--------+-----+
等价1
userCostDF.orderBy($"id" asc).limit(3).filter($"category" =!="⺟婴⽤品").show()
等价2
userCostDF.orderBy($"id" asc).limit(3).filter(row=> ! row.getAs[String] ("category").equals("⺟婴⽤品")).show()
20、map
类似RDD中map、处理DataFrame中的Row类型。
var userDF=spark.sparkContext.parallelize(List(
User01(1,"张晓三",true,18,15000),
User01(2,"李晓四",true,18,18000),
User01(3,"王晓五",false,18,10000)
)).toDF()
val dataset:Dataset[(Int,String,Double)] = userDF.map(row => (row.getAs[Int]("id"),
row.getAs[String]("name"), row.getAs[Double]("salary")))
dataset.toDF("id","name","salary") .show()
+---+------+-------+
| _1| _2| _3|
+---+------+-------+
| 1|张晓三|15000.0| | 2|李晓四|18000.0| | 3|王晓五|10000.0|
+---+------+-------+
21、rdd
可以将Dataset[T]或者DataFrame类型的数据变成RDD[T]或者是RDD[Row]
var userDF=spark.sparkContext.parallelize(List(
User01(1,"张晓三",true,18,15000),
User01(2,"李晓四",true,18,18000),
User01(3,"王晓五",false,18,10000)
)).toDF()
val dataset:Dataset[(Int,String,Double)] = userDF.map(row => (row.getAs[Int]("id"),
row.getAs[String]("name"), row.getAs[Double]("salary")))
dataset.rdd.foreach(t=>println(t._1+" "+t._2+" "+t._3))
四、Dataset/DataFrame SQL
数据准备
t_user
Michael,29,20000,true,MANAGER,1
Andy,30,15000,true,SALESMAN,1
Justin,19,8000,true,CLERK,1
Kaine,20,20000,true,MANAGER,2
Lisa,19,18000,false,SALESMAN,2
t_dept
1,研发
2,设计
3,产品
将以上数据上传到HDFS⽂件系统
//1.创建SparkSession
val spark = SparkSession.builder()
.appName("hellosql") .master("local[10]") .getOrCreate()
//引⼊改隐试转换 主要是 将 集合、RDD 转换为 DataFrame/Dataset
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
val userDF = spark.sparkContext.textFile("hdfs://CentOS:9000/demo/user") .map(line => line.split(","))
.map(ts => User(ts(0), ts(1).toInt, ts(2).toDouble, ts(3).toBoolean, ts(4),
ts(5).toInt))
.toDF()
val deptDF = spark.sparkContext.textFile("hdfs://CentOS:9000/demo/dept") .map(line => line.split(","))
.map(ts => Dept(ts(0).toInt, ts(1)))
.toDF()
userDF.show()
deptDF.show()
//关闭SparkSession
spark.close()
注册视图
userDF.createOrReplaceTempView("t_user")
deptDF.createOrReplaceTempView("t_dept")
执⾏SQL
var sql=
"""
select *, salary * 12 as annual_salary from t_user
"""
spark.sql(sql).show()
1、单表查询
select *, salary * 12 as annual_salary from t_user
+-------+---+-------+-----+--------+------+-------------+
| name|age| salary| sex| job|deptNo|annual_salary|
+-------+---+-------+-----+--------+------+-------------+
|Michael| 29|20000.0| true| MANAGER| 1| 240000.0|
| Andy| 30|15000.0| true|SALESMAN| 1| 180000.0|
| Justin| 19| 8000.0| true| CLERK| 1| 96000.0|
| Kaine| 20|20000.0| true| MANAGER| 2| 240000.0|
| Lisa| 19|18000.0|false|SALESMAN| 2| 216000.0|
+-------+---+-------+-----+--------+------+-------------+
2、like模糊查询
select *, salary * 12 as annual_salary from t_user where name like '%a%'
+-------+---+-------+-----+--------+------+-------------+
| name|age| salary| sex| job|deptNo|annual_salary|
+-------+---+-------+-----+--------+------+-------------+
|Michael| 29|20000.0| true| MANAGER| 1| 240000.0|
| Kaine| 20|20000.0| true| MANAGER| 2| 240000.0|
| Lisa| 19|18000.0|false|SALESMAN| 2| 216000.0|
+-------+---+-------+-----+--------+------+-------------+
3、排序查询
select * from t_user order by deptNo asc,salary desc
+-------+---+-------+-----+--------+------+
| name|age| salary| sex| job|deptNo|
+-------+---+-------+-----+--------+------+
|Michael| 29|20000.0| true| MANAGER| 1|
| Andy| 30|15000.0| true|SALESMAN| 1|
| Justin| 19| 8000.0| true| CLERK| 1|
| Kaine| 20|20000.0| true| MANAGER| 2|
| Lisa| 19|18000.0|false|SALESMAN| 2|
+-------+---+-------+-----+--------+------+
4、limit查询
select * from t_user order by deptNo asc,salary desc limit 3
+-------+---+-------+----+--------+------+
| name|age| salary| sex| job|deptNo|
+-------+---+-------+----+--------+------+
|Michael| 29|20000.0|true| MANAGER| 1|
| Andy| 30|15000.0|true|SALESMAN| 1|
| Justin| 19| 8000.0|true| CLERK| 1|
+-------+---+-------+----+--------+------+
5、分组查询
select deptNo,avg(salary) avg from t_user group by deptNo
+------+------------------+
|deptNo| avg|
+------+------------------+
| 1|14333.333333333334|
| 2| 19000.0|
+------+------------------+
6、Having过滤
select deptNo,avg(salary) avg from t_user group by deptNo having avg > 15000
+------+-------+
|deptNo| avg|
+------+-------+
| 2|19000.0|
+------+-------+
7、case-when
select deptNo,name,salary,sex,
(case sex when true then '男' else '⼥' end ) as user_sex,
(case when salary >= 20000 then '⾼' when salary >= 15000 then '中' else
'低' end ) as level
from t_user
+------+-------+-------+-----+--------+-----+
|deptNo| name| salary| sex|user_sex|level|
+------+-------+-------+-----+--------+-----+
| 1|Michael|20000.0| true| 男| ⾼|
| 1| Andy|15000.0| true| 男| 中|
| 1| Justin| 8000.0| true| 男| 低|
| 2| Kaine|20000.0| true| 男| ⾼|
| 2| Lisa|18000.0|false| ⼥| 中|
+------+-------+-------+-----+--------+-----+
8、行转列
val coursedf = spark.sparkContext.parallelize(List(
(1, "语⽂", 100),
(1, "数学", 100),
(1, "英语", 100),
(2, "数学", 79),
(2, "语⽂", 80),
(2, "英语", 100)
)).toDF("id","course","score")
coursedf.createOrReplaceTempView("t_course")
select id,
sum(case course when '语⽂' then score else 0 end) as chinese,
sum(case course when '数学' then score else 0 end) as math,
sum(case course when '英语' then score else 0 end) as english
from t_course
group by id
+---+-------+----+-------+
| id|chinese|math|english|
+---+-------+----+-------+
| 1| 100| 100| 100|
| 2| 80| 79| 100|
+---+-------+----+-------+
使⽤pivot实现⾏转列
select * from t_course pivot(max(score) for course in ('数学','语⽂','英语'))
+---+----+----+----+
| id|数学|语⽂|英语|
+---+----+----+----+
| 1| 100| 100| 100|
| 2| 79| 80| 100|
+---+----+----+----+
9、表连接
select u.*,d.dname from t_user u left join t_dept d on u.deptNo=d.deptNo
+-------+---+-------+-----+--------+------+-----+
| name|age| salary| sex| job|deptNo|dname|
+-------+---+-------+-----+--------+------+-----+
|Michael| 29|20000.0| true| MANAGER| 1| 研发|
| Andy| 30|15000.0| true|SALESMAN| 1| 研发|
| Justin| 19| 8000.0| true| CLERK| 1| 研发|
| Kaine| 20|20000.0| true| MANAGER| 2| 设计|
| Lisa| 19|18000.0|false|SALESMAN| 2| 设计|
+-------+---+-------+-----+--------+------+-----+
10、子查询
select *,(select sum(t1.salary) from t_user t1 where (t1.deptNo = t2.deptNo) group by
t1.deptNo) as total from t_user t2 left join t_dept d on t2.deptNo=d.deptNo order by
t2.deptNo asc,t2.salary desc
+-------+---+-------+-----+--------+------+------+-----+-------+
| name|age| salary| sex| job|deptNo|deptNo|dname| total|
+-------+---+-------+-----+--------+------+------+-----+-------+
|Michael| 29|20000.0| true| MANAGER| 1| 1| 研发|43000.0|
| Andy| 30|15000.0| true|SALESMAN| 1| 1| 研发|43000.0|
| Justin| 19| 8000.0| true| CLERK| 1| 1| 研发|43000.0|
| Kaine| 20|20000.0| true| MANAGER| 2| 2| 设计|38000.0|
| Lisa| 19|18000.0|false|SALESMAN| 2| 2| 设计|38000.0|
+-------+---+-------+-----+--------+------+------+-----+-------+
11、开窗函数
select *,rank() over(partition by t2.deptNo order by t2.salary desc) as rank from
t_user t2 left join t_dept d on t2.deptNo=d.deptNo order by t2.deptNo asc,t2.salary
desc
+-------+---+-------+-----+--------+------+------+-----+----+
| name|age| salary| sex| job|deptNo|deptNo|dname|rank|
+-------+---+-------+-----+--------+------+------+-----+----+
|Michael| 29|20000.0| true| MANAGER| 1| 1| 研发| 1|
| Andy| 30|15000.0| true|SALESMAN| 1| 1| 研发| 2|
| Justin| 19| 8000.0| true| CLERK| 1| 1| 研发| 3|
| Kaine| 20|20000.0| true| MANAGER| 2| 2| 设计| 1|
| Lisa| 19|18000.0|false|SALESMAN| 2| 2| 设计| 2|
+-------+---+-------+-----+--------+------+------+-----+----+
cube分析
select deptNo,job,max(salary),avg(salary) from t_user group by deptNo,job with cube
+------+--------+-----------+------------------+
|deptNo| job|max(salary)| avg(salary)|
+------+--------+-----------+------------------+
| 1|SALESMAN| 15000.0| 15000.0|
| 1| null| 20000.0|14333.333333333334|
| null| null| 20000.0| 16200.0|
| null|SALESMAN| 18000.0| 16500.0|
| 1| CLERK| 8000.0| 8000.0|
| 2| MANAGER| 20000.0| 20000.0|
| 2| null| 20000.0| 19000.0|
| null| MANAGER| 20000.0| 20000.0|
| null| CLERK| 8000.0| 8000.0|
| 2|SALESMAN| 18000.0| 18000.0|
| 1| MANAGER| 20000.0| 20000.0|
+------+--------+-----------+------------------+
等价写法
select deptNo,job,max(salary),avg(salary) from t_user group by cube(deptNo,job)
+------+--------+-----------+------------------+
|deptNo| job|max(salary)| avg(salary)|
+------+--------+-----------+------------------+
| 1|SALESMAN| 15000.0| 15000.0|
| 1| null| 20000.0|14333.333333333334|
| null| null| 20000.0| 16200.0|
| null|SALESMAN| 18000.0| 16500.0|
| 1| CLERK| 8000.0| 8000.0|
| 2| MANAGER| 20000.0| 20000.0|
| 2| null| 20000.0| 19000.0|
| null| MANAGER| 20000.0| 20000.0|
| null| CLERK| 8000.0| 8000.0|
| 2|SALESMAN| 18000.0| 18000.0|
| 1| MANAGER| 20000.0| 20000.0|
+------+--------+-----------+------------------+
五、自定义函数
spark内置很多函数都定义在 org.apache.spark.sql.functions 单例对象中,如果不满⾜实际需求,⼤家可以考虑对Spark函数库进⾏扩展。
1、单行函数
①、定义函数
val sexFunction=(sex:Boolean)=> sex match {
case true => "男"
case false => "⼥"
case default => "位置" }
val commFunction=(age:Int,salary:Double)=> {
if(age>=30){
salary+500
}else{
salary
}
}
②、注册⽤户的函数
spark.udf.register("sexFunction",sexFunction)
spark.udf.register("commFunction",commFunction)
③、测试使⽤函数
select name,sexFunction(sex),age,salary,job,commFunction(age,salary) as comm from
t_user
+-------+--------------------+---+-------+--------+-------+
| name|UDF:sexFunction(sex)|age| salary| job| comm|
+-------+--------------------+---+-------+--------+-------+
|Michael| 男| 29|20000.0| MANAGER|20000.0|
| Andy| 男| 30|15000.0|SALESMAN|15500.0|
| Justin| 男| 19| 8000.0| CLERK| 8000.0|
| Kaine| 男| 20|20000.0| MANAGER|20000.0|
| Lisa| ⼥| 19|18000.0|SALESMAN|18000.0|
+-------+--------------------+---+-------+--------+-------+
2、聚合函数
①、Untyped
a、定义聚合函数
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructField,
StructType}
object CustomUserDefinedAggregateFunction extends UserDefinedAggregateFunction{
//接收数据类型是什么
override def inputSchema: StructType = {
StructType(StructField("inputColumn", DoubleType) :: Nil)
}
//⽤于作为缓冲中间结果类型
override def bufferSchema: StructType = {
StructType(StructField("count", IntegerType) ::StructField("total", DoubleType)::
Nil)
}
//最终返回值类型
override def dataType: DataType = DoubleType
//表示函数输出结果类型是否⼀致
override def deterministic: Boolean = true
//设置聚合初始化状态
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0 //总计数
buffer(1)=0.0 //总和
}
//将row中结果累加到buffer中
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
var historyCount = buffer.getInt(0)
var historyTotal = buffer.getDouble(1)
if(!input.isNullAt(0)){
historyTotal += input.getDouble(0)
historyCount += 1
buffer(0)= historyCount
buffer(1) = historyTotal
}
}
//做最终汇总操作
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getInt(0) + buffer2.getInt(0)
buffer1(1)=buffer1.getDouble(1) + buffer2.getDouble(1)
}
//计算最终结果
override def evaluate(buffer: Row): Any = {
buffer.getDouble(1) / buffer.getInt(0)
}
}
b、注册⽤户的函数
spark.udf.register("custom_avg",CustomUserDefinedAggregateFunction)
c、测试使⽤函数
select deptNo,custom_avg(salary) from t_user group by deptNo
+------+-------------------------------------------+
|deptNo|customuserdefinedaggregatefunction$(salary)|
+------+-------------------------------------------+
| 1| 14333.333333333334|
| 2| 19000.0|
+------+-------------------------------------------+
②、Type-Safe
a、定义聚合函数
case class Average(total:Double,count:Int)
object CustomAggregator extends Aggregator[GenericRowWithSchema,Average,Double]{
//初始化值
override def zero: Average = Average(0.0,0)
//计算局部结果
override def reduce(b: Average, a: GenericRowWithSchema): Average = {
Average(b.total+a.getAs[Double]("salary"),b.count+1)
}
//将局部结果合并
override def merge(b1: Average, b2: Average): Average = {
Average(b1.total+b2.total,b1.count+b2.count)
}
//计算总结果
override def finish(reduction: Average): Double = {
reduction.total/reduction.count
}
//指定中间结果类型的Encoders
override def bufferEncoder: Encoder[Average] = Encoders.product[Average]
//指定最终结果类型的Encoders
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
b、注册⽤户的函数
val averageSalary = CustomAggregator.toColumn.name("average_salary")
c、测试使⽤函数
import org.apache.spark.sql.functions._
userDF.select("deptNo","salary")
.groupBy("deptNo")
.agg(averageSalary,avg("salary"))
.show()
+------+------------------+------------------+
|deptNo| average_salary| avg(salary)|
+------+------------------+------------------+
| 1|14333.333333333334|14333.333333333334|
| 2| 19000.0| 19000.0|
+------+------------------+------------------+
六、Load&Save
1、paquet文件
①、save
Parquet仅仅是⼀种存储格式,它是语⾔、平台⽆关的,并且不需要和任何⼀种数据处理框架绑定.
var sql=
"""
select * from t_user
"""
val result: DataFrame = spark.sql(sql)
result.write.save("hdfs://CentOS:9000/results/paquet")
②、load
val dataFrame = spark.read.load("hdfs://CentOS:9000/results/paquet")
dataFrame.printSchema()
dataFrame.show()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
|-- sex: boolean (nullable = true)
|-- job: string (nullable = true)
|-- deptNo: integer (nullable = true)
+-------+---+-------+-----+--------+------+
| name|age| salary| sex| job|deptNo|
+-------+---+-------+-----+--------+------+
|Michael| 29|20000.0| true| MANAGER| 1|
| Andy| 30|15000.0| true|SALESMAN| 1|
| Justin| 19| 8000.0| true| CLERK| 1|
| Kaine| 20|20000.0| true| MANAGER| 2|
| Lisa| 19|18000.0|false|SALESMAN| 2|
+-------+---+-------+-----+--------+------+
等价写法: spark.read.parquet(“hdfs://CentOS:9000/results/paquet”)
2、Json格式
①、save
var sql=
"""
select * from t_user
"""
val result: DataFrame = spark.sql(sql)
result.write
.format("json")
.mode(SaveMode.Overwrite)
.save("hdfs://CentOS:9000/results/json")
②、load
val dataFrame = spark.read
.format("json")
.load("hdfs://CentOS:9000/results/json")
dataFrame.printSchema()
dataFrame.show()
root
|-- age: long (nullable = true)
|-- deptNo: long (nullable = true)
|-- job: string (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
|-- sex: boolean (nullable = true)
+---+------+--------+-------+-------+-----+
|age|deptNo| job| name| salary| sex|
+---+------+--------+-------+-------+-----+
| 29| 1| MANAGER|Michael|20000.0| true|
| 30| 1|SALESMAN| Andy|15000.0| true|
| 19| 1| CLERK| Justin| 8000.0| true|
| 20| 2| MANAGER| Kaine|20000.0| true|
| 19| 2|SALESMAN| Lisa|18000.0|false|
+---+------+--------+-------+-------+-----+
⽤户也可以j简单写 spark.read.json(“hdfs://CentOS:9000/results/json”)
3、csv格式
①、save
var sql=
"""
select * from t_user
"""
val result: DataFrame = spark.sql(sql)
result.write
.format("csv")
.mode(SaveMode.Overwrite)
.option("sep", ",")//指定分隔符
.option("inferSchema", "true")//参照表schema信息
.option("header", "true")//是否产⽣表头信息
.save("hdfs://CentOS:9000/results/csv")
②、load
val dataFrame = spark.read
.format("csv")
.option("sep", ",")//指定分隔符
.option("inferSchema", "true")//参照表schema信息
.option("header", "true")//是否产⽣表头信息
.load("hdfs://CentOS:9000/results/csv")
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
|-- sex: boolean (nullable = true)
|-- job: string (nullable = true)
|-- deptNo: integer (nullable = true)
+-------+---+-------+-----+--------+------+
| name|age| salary| sex| job|deptNo|
+-------+---+-------+-----+--------+------+
|Michael| 29|20000.0| true| MANAGER| 1|
| Andy| 30|15000.0| true|SALESMAN| 1|
| Justin| 19| 8000.0| true| CLERK| 1|
| Kaine| 20|20000.0| true| MANAGER| 2|
| Lisa| 19|18000.0|false|SALESMAN| 2|
+-------+---+-------+-----+--------+------+
4、ORC格式
ORC的全称是(Optimized Row Columnar),ORC⽂件格式是⼀种Hadoop⽣态圈中的列式存储格式,它的产⽣早在2013年初,最初产⽣⾃Apache Hive,⽤于降低Hadoop数据存储空间和加速Hive查询速度。
①、save
var sql=
"""
select * from t_user
"""
val result: DataFrame = spark.sql(sql)
result.write
.format("orc")
.mode(SaveMode.Overwrite)
.save("hdfs://CentOS:9000/results/orc")
②、load
val dataFrame = spark.read
.orc("hdfs://CentOS:9000/results/orc")
dataFrame.printSchema()
dataFrame.show()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
|-- sex: boolean (nullable = true)
|-- job: string (nullable = true)
|-- deptNo: integer (nullable = true)
+-------+---+-------+-----+--------+------+
| name|age| salary| sex| job|deptNo|
+-------+---+-------+-----+--------+------+
|Michael| 29|20000.0| true| MANAGER| 1|
| Andy| 30|15000.0| true|SALESMAN| 1|
| Justin| 19| 8000.0| true| CLERK| 1|
| Kaine| 20|20000.0| true| MANAGER| 2|
| Lisa| 19|18000.0|false|SALESMAN| 2|
+-------+---+-------+-----+--------+------+
5、SQL读取文件
val parqeutDF = spark.sql("SELECT * FROM parquet.`hdfs://CentOS:9000/results/paquet`")
val jsonDF = spark.sql("SELECT * FROM json.`hdfs://CentOS:9000/results/json`")
val orcDF = spark.sql("SELECT * FROM orc.`hdfs://CentOS:9000/results/orc/`")
//val csvDF = spark.sql("SELECT * FROM csv.`hdfs://CentOS:9000/results/csv/`")
parqeutDF.show()
jsonDF.show()
orcDF.show()
// csvDF.show()
6、JDBC数据读取
①、load
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
val dataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://CentOS:3306/test")
.option("dbtable", "t_user")
.option("user", "root")
.option("password", "root")
.load()
dataFrame.show()
+---+--------+-----+---+----------+
| id| name| sex|age| birthDay|
+---+--------+-----+---+----------+
| 0|zhangsan| true| 20|2020-01-11|
| 1| lisi|false| 25|2020-01-10|
| 3| wangwu| true| 36|2020-01-17|
| 4| zhao6|false| 50|1990-02-08|
| 5| win7| true| 20|1991-02-08|
| 6| win8|false| 28|2000-01-01|
+---+--------+-----+---+----------+
②、save
val props = new Properties()
props.put("user", "root")
props.put("password", "root")
result .write
.mode(SaveMode.Overwrite)
.jdbc("jdbc:mysql://CentOS:3306/test","t_user",props)
系统会⾃动创建t_user表
七、Spark&Hive集成
1、修改hive-site.xml
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://CentOS:3306/hive?createDatabaseIfNotExist=true</value>
</property> <property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property> <property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property> <property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
<!--开启MetaStore服务,⽤于Spark读取hive中的元数据-->
<property>
<name>hive.metastore.uris</name>
<value>thrift://CentOS:9083</value>
</property>
2、启动metastore服务
[[email protected] apache-hive-1.2.2-bin]# ./bin/hive --service metastore >/dev/null 2>&1 &
[1] 55017
3、导⼊以下依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.5</version>
</dependency>
4、编写如下代码
//配置spark
val spark = SparkSession.builder()
.appName("Spark Hive Example")
.master("local[*]")
.config("hive.metastore.uris", "thrift://CentOS:9083")
.enableHiveSupport() //启动hive⽀持
.getOrCreate()
spark.sql("show databases").show()
spark.sql("use baizhi")
spark.sql("select * from t_emp").na.fill(0.0).show()
spark.close()
+-----+------+---------+----+-------------------+-------+-------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+-------------------+-------+-------+------+
| 7369| SMITH| CLERK|7902|1980-12-17 00:00:00| 800.00| 0.00| 20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600.00| 300.00| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22 00:00:00|1250.00| 500.00| 30|
| 7566| JONES| MANAGER|7839|1981-04-02 00:00:00|2975.00| 0.00| 20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28 00:00:00|1250.00|1400.00| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01 00:00:00|2850.00| 0.00| 30|
| 7782| CLARK| MANAGER|7839|1981-06-09 00:00:00|2450.00| 0.00| 10|
| 7788| SCOTT| ANALYST|7566|1987-04-19 00:00:00|1500.00| 0.00| 20|
| 7839| KING|PRESIDENT| 0|1981-11-17 00:00:00|5000.00| 0.00| 10|
| 7844|TURNER| SALESMAN|7698|1981-09-08 00:00:00|1500.00| 0.00| 30|
| 7876| ADAMS| CLERK|7788|1987-05-23 00:00:00|1100.00| 0.00| 20|
| 7900| JAMES| CLERK|7698|1981-12-03 00:00:00| 950.00| 0.00| 30|
| 7902| FORD| ANALYST|7566|1981-12-03 00:00:00|3000.00| 0.00| 20|
| 7934|MILLER| CLERK|7782|1982-01-23 00:00:00|1300.00| 0.00| 10|
+-----+------+---------+----+-------------------+-------+-------+------+
八、Spark Catalyst
⼤数据相关技术与传统型数据库技术很多都是相互融合、互相借鉴的。传统型数据库强势在于其久经考验的SQL优化器经验,弱势在于分布式领域的⾼可⽤性、容错性、扩展性等,假以时⽇,让其经过⼀定的改造,⽐如引⼊Paxos、ra!等,强化⾃⼰在分布式领域的能⼒,相信⼀定会在⼤数据系统中占有⼀席之地。相反,⼤数据相关技术优势在于其天⽣的扩展性、可⽤性、容错性等,但其SQL优化器经验却基本全部来⾃于传统型数据库,当然,针对列式存储⼤数据SQL优化器会有⼀定的优化策略。
本⽂主要介绍SparkSQL的优化器系统Catalyst,上⽂讲到其设计思路基本都来⾃于传统型数据库,⽽且
和⼤多数当前的⼤数据SQL处理引擎设计基本相同(Impala、Presto、Hive(Calcite)等),因此通过本⽂的学习也可以基本了解所有其他SQL处理引擎的⼯作原理。
SQL优化器核⼼执⾏策略主要分为两个⼤的⽅向:基于规则优化(RBO)以及基于代价优化(CBO),基于规则优化是⼀种经验式、启发式地优化思路,更多地依靠前辈总结出来的优化规则,简单易⾏且能够覆盖到⼤部分优化逻辑,但是对于核⼼优化算⼦Join却显得有点⼒不从⼼。举个简单的例⼦,两个表执Join到底应该使⽤BroadcastHashJoin还是SortMergeJoin?当前SparkSQL的⽅式是通过⼿⼯设定参数来确定,如果⼀个表的数据量⼩于这个值就使⽤BroadcastHashJoin,但是这种⽅案显得很不优雅,很不灵
活。基于代价优化就是为了解决这类问题,它会针对每个Join评估当前两张表使⽤每种Join策略的代
价,根据代价估算确定⼀种代价最⼩的⽅案。
本⽂将会重点介绍基于规则的优化策略,后续⽂章会详细介绍基于代价的优化策略。下图中红⾊框框部
分将是本⽂的介绍重点:
1、Tree&Rule
在介绍SQL优化器⼯作原理之前,有必要⾸先介绍两个重要的数据结构:Tree和Rule。相信⽆论对SQL优化器有⽆了解,都肯定知道SQL语法树这个概念,不错,SQL语法树就是SQL语句通过编译器之后会被解析成⼀棵树状结构。这棵树会包含很多节点对象,每个节点都拥有特定的数据类型,同时会有0个或多个孩⼦节点(节点对象在代码中定义为TreeNode对象),下图是个简单的示例:
如上图所示,箭头左边表达式有3种数据类型(Literal表示常量、Attribute表示变量、Add表示动作),
表示x+(1+2)。映射到右边树状结构后,每⼀种数据类型就会变成⼀个节点。另外,Tree还有⼀个⾮常重
要的特性,可以通过⼀定的规则进⾏等价变换,如下图:
expression.transform{
case Add(Literal(x,IntegerType),Literal(y,IntegerType)) => Literal(x+y)
}
上图定义了⼀个等价变换规则(Rule):两个Integer类型的常量相加可以等价转换为⼀个Integer常量,这个规则其实很简单,对于上⽂中提到的表达式x+(1+2)来说就可以转变为x+3。对于程序来讲,如何找到两个Integer常量呢?其实就是简单的⼆叉树遍历算法,每遍历到⼀个节点,就模式匹配当前节点为Add、左右⼦节点是Integer常量的结构,定位到之后将此三个节点替换为⼀个Literal类型的节点。上⾯⽤⼀个最简单的示例来说明等价变换规则以及如何将规则应⽤于语法树。在任何⼀个SQL优化器中,通常会定义⼤量的Rule(后⾯会讲到),SQL优化器会遍历语法树中每个节点,针对遍历到的节点模式匹配所有给定规则(Rule),如果有匹配成功的,就进⾏相应转换,如果所有规则都匹配失败,就继续遍历下⼀个节点。
2、Catalyst工作流程
任何⼀个优化器⼯作原理都⼤同⼩异:SQL语句⾸先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan;Unresolved Logical Plan通过Analyzer模块借助于数据元数据解析为Logical
Plan;此时再通过各种基于规则的优化策略进⾏深⼊优化,得到Optimized Logical Plan;优化后的逻辑
执⾏计划依然是逻辑的,并不能被Spark系统理解,此时需要将此逻辑执⾏计划转换为Physical Plan;为
了更好的对整个过程进⾏理解,下⽂通过⼀个简单示例进⾏解释。
①、Parser
Parser简单来说是将SQL字符串切分成⼀个⼀个Token,再根据⼀定语义规则解析为⼀棵语法树。Parser
模块⽬前基本都使⽤第三⽅类库ANTLR进⾏实现,⽐如Hive、 Presto、SparkSQL等。下图是⼀个示例性的SQL语句(有两张表,其中people表主要存储⽤户基本信息,score表存储⽤户的各种成绩),通过
Parser解析后的AST语法树如右图所示:
②、Analyzer
通过解析后的逻辑执⾏计划基本有了⻣架,但是系统并不知道score、sum这些都是些什么⻤,此时需要
基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:表的Scheme和基本函数信
息,表的scheme主要包括表的基本定义(列名、数据类型)、表的数据格式(Json、Text)、表的物理
位置等,基本函数信息主要指类信息。
Analyzer会再次遍历整个语法树,对树上的每个节点进⾏数据类型绑定以及函数绑定,⽐如people词素
会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变
量,sum会被解析为特定的聚合函数,如下图所示:
SparkSQL中Analyzer定义了各种解析规则,有兴趣深⼊了解的童鞋可以查看Analyzer类,其中定义了基本的解析规则,如下
③、Optimizer
优化器是整个Catalyst的核⼼,上⽂提到优化器分为基于规则优化和基于代价优化两种,当前SparkSQL
2.1依然没有很好的⽀持基于代价优化(下⽂细讲),此处只介绍基于规则的优化策略,基于规则的优化
策略实际上就是对语法树进⾏⼀次遍历,模式匹配能够满⾜特定规则的节点,再进⾏相应的等价转换。
因此,基于规则优化说到底就是⼀棵树等价地转换为另⼀棵树。SQL中经典的优化规则有很多,下⽂结
合示例介绍三种⽐较常⻅的规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)。
上图左边是经过Analyzer解析后的语法树,语法树中两个表先做join,之后再使⽤age>10对结果进⾏过
滤。⼤家知道join算⼦通常是⼀个⾮常耗时的算⼦,耗时多少⼀般取决于参与join的两个表的⼤⼩,如果
能够减少参与join两表的⼤⼩,就可以⼤⼤降低join算⼦所需时间。谓词下推就是这样⼀种功能,它会将
过滤操作下推到join之前进⾏,上图中过滤条件age>0以及id!=null两个条件就分别下推到了join之前。这
样,系统在扫描数据的时候就对数据进⾏了过滤,参与join的数据量将会得到显著的减少,join耗时必然
也会降低。
常量累加其实很简单,就是上⽂中提到的规则 x+(1+2) -> x+3,虽然是⼀个很⼩的改动,但是意义巨⼤。示例如果没有进⾏优化的话,每⼀条结果都需要执⾏⼀次100+80的操作,然后再与变量math_score以及english_score相加,⽽优化后就不需要再执⾏100+80操作。
列值裁剪是另⼀个经典的规则,示例中对于people表来说,并不需要扫描它的所有列值,⽽只需要列值
id,所以在扫描people之后需要将其他列进⾏裁剪,只留下列id。这个优化⼀⽅⾯⼤幅度减少了⽹络、
内存数据量消耗,另⼀⽅⾯对于列存数据库(Parquet)来说⼤⼤提⾼了扫描效率。除此之外,Catalyst
还定义了很多其他优化规则,有兴趣深⼊了解的童鞋可以查看Optimizer类,下图简单的截取⼀部分规
则:
⾄此,逻辑执⾏计划已经得到了⽐较完善的优化,然⽽,逻辑执⾏计划依然没办法真正执⾏,他们只是
逻辑上可⾏,实际上Spark并不知道如何去执⾏这个东⻄。⽐如Join只是⼀个抽象概念,代表两个表根据
相同的id进⾏合并,然⽽具体怎么实现这个合并,逻辑执⾏计划并没有说明。
此时就需要将逻辑执⾏计划转换为物理执⾏计划,将逻辑上可⾏的执⾏计划变为Spark可以真正执⾏的
计划。⽐如Join算⼦,Spark根据不同场景为该算⼦制定了不同的算法策略,有BroadcastHashJoin、
Shu"leHashJoin以及SortMergeJoin等(可以将Join理解为⼀个接⼝,BroadcastHashJoin是其中⼀个具体实现),物理执⾏计划实际上就是在这些具体实现中挑选⼀个耗时最⼩的算法实现,这个过程涉及到基于代价优化策略。
④、SparkSQL执行计划
⾄此,笔者通过⼀个简单的示例完整的介绍了Catalyst的整个⼯作流程,包括Parser阶段、Analyzer阶
段、Optimize阶段以及Physical Planning阶段。有同学可能会⽐较感兴趣Spark环境下如何查看⼀条具体
的SQL的整个过程,在此介绍两种⽅法:
- 使⽤queryExecution⽅法查看逻辑执⾏计划,使⽤explain⽅法查看物理执⾏计划,分别如下所示:
- 使⽤Spark WebUI进⾏查看,如下图所示: