SparkSql 是一種處理結構化模型資料的Spark子產品,它提供了一種叫做DataFrame抽象程式設計,它也可以作為分布式Sql查詢引擎, SparkSql可以從已經安裝的Hive服務中讀取資料,也可以從RDBMS 資料庫中讀取資料。
在Spark2.0之後,引入了SparkSession新概念。SparkSession實質上是SQLContext和HiveContext的組合,是以在SQLContext和HiveContext上使用用的API在SparkSession上同樣可以使用。SparkSession内部封裝了SparkContext,是以實際上是由SparkContext完成的SparkSession為使用者提供了統一的切入點,來讓使用者學習Spark的各項功能。
下面将簡單介紹SparkSession的建立和SparkSQL的使用。
一、SparkSession
SparkSession的設計遵循了工廠設計模式(factory design pattern),下面的代碼将介紹SparkSession的建立。
val conf: SparkConf = new SparkConf()
conf.setMaster("local[1]").setAppName("UserOrderDataFrameExample")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
如果我們需要使用SparkContext,可用從SparkSession中擷取:
val sparkContext:SparkContext =sparkSession.SparkContext
建立好SparkSession後,就可以使用SparkSession去建立DataFrame和SparkSQL去操作資料了。在一個Spark應用中,隻能建立一個SparkSesion。
二、SparkSQL
使用SparkSQL簡單介紹使用者訂單的資料關聯,使用者訂單資料使用程式自動模拟生成。
2.1 定義使用者訂單資料結構
代碼如下:
import java.sql.Timestamp
/**
* 建立使用者的資料結構
**/
case class User(userId: Int, userName: String, tel: String)
/**
* 建立使用者訂單的資料結構
**/
case class UserOrder(userId: Int, orderId: String)
/**
* 建立訂單的資料結構
**/
case class Order(orderId: String, time: Timestamp, money: Double)
/**
* 使用者訂單資料源
**/
class UserOrderSource(var users: List[User], var userOrder: List[UserOrder], var orders: List[Order]) {
}
2.2 建立使用者及訂單細資訊
代碼如下:
import java.sql.Timestamp
import java.util.UUID
import scala.collection.mutable.ListBuffer
/**
* 訂單生成器
**/
object OrderGenerator {
var tempUserId: Int = 0
def makerOrder(): UserOrderSource = {
val users: ListBuffer[User] = new ListBuffer[User]()
val orders: ListBuffer[Order] = new ListBuffer[Order]
val userOrders: ListBuffer[UserOrder] = new ListBuffer[UserOrder]()
var user: User = null
var order: Order = null
var userOrder: UserOrder = null
//建立10個使用者
for (index <- 1 to 10) {
user = User(createUserId(), s"UserName-${index}", s"1882345889${index - 1}")
users += user
//每個使用者建立3個訂單
for (num <- 1 to 3) {
order = Order(createOrderId(), new Timestamp(System.currentTimeMillis()), createOrderMoney())
userOrder = UserOrder(user.userId, order.orderId)
orders += order
userOrders += userOrder
}
}
new UserOrderSource(users.toList, userOrders.toList, orders.toList)
}
def createUserId(): Int = {
this.synchronized {
tempUserId = tempUserId + 1
tempUserId
}
}
def createOrderId(): String = {
val uuid: UUID = UUID.randomUUID()
uuid.toString()
}
def createOrderMoney(): Double = {
(Math.random() * 100000).toInt / 100d
}
}
2.3 使用SparkSQL操作DataFrame
代碼如下:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用者訂單的DataFrame操作
*/
object UserOrderDataFrame {
var userOrderSource: UserOrderSource = null
def main(args: Array[String]): Unit = {
println("this is about user order dataframe for spark example.===================")
try {
//建立使用者訂單資料
userOrderSource = OrderGenerator.makerOrder()
//初始化Spark
initSpark()
dataFrameForSql()
dataFrameForJoin()
} catch {
case e: Exception => {
e.printStackTrace()
}
case e: Throwable => {
e.printStackTrace()
}
} finally {
SparkSession.builder().getOrCreate().close()
}
println("this is one user order dataframe for spark example.===================")
}
def initSpark(): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local[1]").setAppName("UserOrderDataFrameExample")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
}
/**
* 使用SQL操作DataFrame
*/
def dataFrameForSql(): Unit = {
val sparkSession: SparkSession = SparkSession.builder().getOrCreate()
//建立DataFrame
val dfUsers: DataFrame = sparkSession.createDataFrame[User](userOrderSource.users)
val dfOrders: DataFrame = sparkSession.createDataFrame[Order](userOrderSource.orders)
val dfUserOrders: DataFrame = sparkSession.createDataFrame[UserOrder](userOrderSource.userOrder)
dfUsers.printSchema()
dfOrders.printSchema()
dfUserOrders.printSchema()
//建立臨時表
dfUsers.createOrReplaceTempView("User")
dfOrders.createOrReplaceTempView("Order")
dfUserOrders.createOrReplaceTempView("UserOrder")
//Spark SQL
val sql = "SELECT T1.*,T2.*,T3.* FROM User T1 INNER JOIN UserOrder T2 ON T1.userId=T2.userId INNER JOIN Order T3 ON T2.orderId=T3.orderId"
val dfResult: DataFrame = sparkSession.sql(sql)
println("SparkSQL join result=================================")
dfResult.printSchema()
dfResult.show(100)
}
}
上述代碼的運作結果如下:
root
|-- userId: integer (nullable = false)
|-- userName: string (nullable = true)
|-- tel: string (nullable = true)
root
|-- orderId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- money: double (nullable = false)
root
|-- userId: integer (nullable = false)
|-- orderId: string (nullable = true)
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: User
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: Order
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: UserOrder
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: SELECT T1.*,T2.*,T3.* FROM User T1 INNER JOIN UserOrder T2 ON T1.userId=T2.userId INNER JOIN Order T3 ON T2.orderId=T3.orderId
SparkSQL join result=================================
root
|-- userId: integer (nullable = false)
|-- userName: string (nullable = true)
|-- tel: string (nullable = true)
|-- userId: integer (nullable = false)
|-- orderId: string (nullable = true)
|-- orderId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- money: double (nullable = false)
+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
|userId| userName| tel|userId| orderId| orderId| time| money|
+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
| 1| UserName-1|18823458890| 1|f44be3a5-8e97-47f...|f44be3a5-8e97-47f...|2018-05-30 18:17:...|513.33|
| 1| UserName-1|18823458890| 1|ef516070-acec-4ee...|ef516070-acec-4ee...|2018-05-30 18:17:...|400.59|
| 1| UserName-1|18823458890| 1|ef004d7d-b0d8-4d7...|ef004d7d-b0d8-4d7...|2018-05-30 18:17:...|336.88|
| 2| UserName-2|18823458891| 2|f45b08ff-d04e-408...|f45b08ff-d04e-408...|2018-05-30 18:17:...|693.88|
| 2| UserName-2|18823458891| 2|e2d6fb42-1558-42e...|e2d6fb42-1558-42e...|2018-05-30 18:17:...|278.61|
| 2| UserName-2|18823458891| 2|9bc923d4-528a-4dc...|9bc923d4-528a-4dc...|2018-05-30 18:17:...|719.47|
| 3| UserName-3|18823458892| 3|e0dfd87b-36e5-49f...|e0dfd87b-36e5-49f...|2018-05-30 18:17:...|425.51|
| 3| UserName-3|18823458892| 3|149f0659-12c2-492...|149f0659-12c2-492...|2018-05-30 18:17:...|129.92|
| 3| UserName-3|18823458892| 3|c155a1c8-72b4-4f3...|c155a1c8-72b4-4f3...|2018-05-30 18:17:...|258.57|
| 4| UserName-4|18823458893| 4|54b0c494-a096-4e6...|54b0c494-a096-4e6...|2018-05-30 18:17:...|955.21|
| 4| UserName-4|18823458893| 4|bc6a24b8-6d77-4a5...|bc6a24b8-6d77-4a5...|2018-05-30 18:17:...| 69.42|
| 4| UserName-4|18823458893| 4|b4d68db1-7b02-44d...|b4d68db1-7b02-44d...|2018-05-30 18:17:...|571.32|
| 5| UserName-5|18823458894| 5|a6ccd370-494c-4f2...|a6ccd370-494c-4f2...|2018-05-30 18:17:...| 72.92|
| 5| UserName-5|18823458894| 5|646adb1c-73f1-44e...|646adb1c-73f1-44e...|2018-05-30 18:17:...| 69.54|
| 5| UserName-5|18823458894| 5|532be792-5343-47a...|532be792-5343-47a...|2018-05-30 18:17:...|179.44|
| 6| UserName-6|18823458895| 6|78842349-3c60-486...|78842349-3c60-486...|2018-05-30 18:17:...|111.39|
| 6| UserName-6|18823458895| 6|c4cda44d-42ae-46c...|c4cda44d-42ae-46c...|2018-05-30 18:17:...|111.26|
| 6| UserName-6|18823458895| 6|26b90354-7e46-482...|26b90354-7e46-482...|2018-05-30 18:17:...|336.82|
| 7| UserName-7|18823458896| 7|b0e51c7b-7538-4c6...|b0e51c7b-7538-4c6...|2018-05-30 18:17:...|399.73|
| 7| UserName-7|18823458896| 7|fd8acde2-b115-485...|fd8acde2-b115-485...|2018-05-30 18:17:...|295.53|
| 7| UserName-7|18823458896| 7|2c233d01-59fa-430...|2c233d01-59fa-430...|2018-05-30 18:17:...| 52.69|
| 8| UserName-8|18823458897| 8|a73308fd-f3de-4e4...|a73308fd-f3de-4e4...|2018-05-30 18:17:...| 91.96|
| 8| UserName-8|18823458897| 8|a21deab3-8d88-493...|a21deab3-8d88-493...|2018-05-30 18:17:...|343.63|
| 8| UserName-8|18823458897| 8|25092940-ecde-487...|25092940-ecde-487...|2018-05-30 18:17:...|860.76|
| 9| UserName-9|18823458898| 9|5f2298bf-0859-425...|5f2298bf-0859-425...|2018-05-30 18:17:...|907.78|
| 9| UserName-9|18823458898| 9|cb71a2f9-f973-4ad...|cb71a2f9-f973-4ad...|2018-05-30 18:17:...|666.09|
| 9| UserName-9|18823458898| 9|f64b4ede-7faa-421...|f64b4ede-7faa-421...|2018-05-30 18:17:...|134.23|
| 10|UserName-10|18823458899| 10|2eb50d4e-5230-487...|2eb50d4e-5230-487...|2018-05-30 18:17:...|957.02|
| 10|UserName-10|18823458899| 10|faa13220-d459-4b4...|faa13220-d459-4b4...|2018-05-30 18:17:...|888.55|
| 10|UserName-10|18823458899| 10|8d07cc86-9b13-4d2...|8d07cc86-9b13-4d2...|2018-05-30 18:17:...|228.51|
+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
2.4 使用DataFrame的Join方法冠關聯資料
代碼如下:
/**
* 使用DataFrame的Join方法連接配接DataFrame
*/
def dataFrameForJoin(): Unit = {
val sparkSession: SparkSession = SparkSession.builder().getOrCreate()
//建立DataFrame
val dfUsers: DataFrame = sparkSession.createDataFrame[User](userOrderSource.users)
val dfOrders: DataFrame = sparkSession.createDataFrame[Order](userOrderSource.orders)
val dfUserOrders: DataFrame = sparkSession.createDataFrame[UserOrder](userOrderSource.userOrder)
dfUsers.printSchema()
dfOrders.printSchema()
dfUserOrders.printSchema()
val dfResult: DataFrame = dfUsers.join(dfUserOrders, "userId").join(dfOrders, "orderId")
println("DataFrame join result=================================")
dfResult.printSchema()
dfResult.show(100)
}
運作結果:
root
|-- userId: integer (nullable = false)
|-- userName: string (nullable = true)
|-- tel: string (nullable = true)
root
|-- orderId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- money: double (nullable = false)
root
|-- userId: integer (nullable = false)
|-- orderId: string (nullable = true)
DataFrame join result=================================
root
|-- orderId: string (nullable = true)
|-- userId: integer (nullable = false)
|-- userName: string (nullable = true)
|-- tel: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- money: double (nullable = false)
+--------------------+------+-----------+-----------+--------------------+------+
| orderId|userId| userName| tel| time| money|
+--------------------+------+-----------+-----------+--------------------+------+
|f44be3a5-8e97-47f...| 1| UserName-1|18823458890|2018-05-30 18:17:...|513.33|
|ef516070-acec-4ee...| 1| UserName-1|18823458890|2018-05-30 18:17:...|400.59|
|ef004d7d-b0d8-4d7...| 1| UserName-1|18823458890|2018-05-30 18:17:...|336.88|
|f45b08ff-d04e-408...| 2| UserName-2|18823458891|2018-05-30 18:17:...|693.88|
|e2d6fb42-1558-42e...| 2| UserName-2|18823458891|2018-05-30 18:17:...|278.61|
|9bc923d4-528a-4dc...| 2| UserName-2|18823458891|2018-05-30 18:17:...|719.47|
|e0dfd87b-36e5-49f...| 3| UserName-3|18823458892|2018-05-30 18:17:...|425.51|
|149f0659-12c2-492...| 3| UserName-3|18823458892|2018-05-30 18:17:...|129.92|
|c155a1c8-72b4-4f3...| 3| UserName-3|18823458892|2018-05-30 18:17:...|258.57|
|54b0c494-a096-4e6...| 4| UserName-4|18823458893|2018-05-30 18:17:...|955.21|
|bc6a24b8-6d77-4a5...| 4| UserName-4|18823458893|2018-05-30 18:17:...| 69.42|
|b4d68db1-7b02-44d...| 4| UserName-4|18823458893|2018-05-30 18:17:...|571.32|
|a6ccd370-494c-4f2...| 5| UserName-5|18823458894|2018-05-30 18:17:...| 72.92|
|646adb1c-73f1-44e...| 5| UserName-5|18823458894|2018-05-30 18:17:...| 69.54|
|532be792-5343-47a...| 5| UserName-5|18823458894|2018-05-30 18:17:...|179.44|
|78842349-3c60-486...| 6| UserName-6|18823458895|2018-05-30 18:17:...|111.39|
|c4cda44d-42ae-46c...| 6| UserName-6|18823458895|2018-05-30 18:17:...|111.26|
|26b90354-7e46-482...| 6| UserName-6|18823458895|2018-05-30 18:17:...|336.82|
|b0e51c7b-7538-4c6...| 7| UserName-7|18823458896|2018-05-30 18:17:...|399.73|
|fd8acde2-b115-485...| 7| UserName-7|18823458896|2018-05-30 18:17:...|295.53|
|2c233d01-59fa-430...| 7| UserName-7|18823458896|2018-05-30 18:17:...| 52.69|
|a73308fd-f3de-4e4...| 8| UserName-8|18823458897|2018-05-30 18:17:...| 91.96|
|a21deab3-8d88-493...| 8| UserName-8|18823458897|2018-05-30 18:17:...|343.63|
|25092940-ecde-487...| 8| UserName-8|18823458897|2018-05-30 18:17:...|860.76|
|5f2298bf-0859-425...| 9| UserName-9|18823458898|2018-05-30 18:17:...|907.78|
|cb71a2f9-f973-4ad...| 9| UserName-9|18823458898|2018-05-30 18:17:...|666.09|
|f64b4ede-7faa-421...| 9| UserName-9|18823458898|2018-05-30 18:17:...|134.23|
|2eb50d4e-5230-487...| 10|UserName-10|18823458899|2018-05-30 18:17:...|957.02|
|faa13220-d459-4b4...| 10|UserName-10|18823458899|2018-05-30 18:17:...|888.55|
|8d07cc86-9b13-4d2...| 10|UserName-10|18823458899|2018-05-30 18:17:...|228.51|
+--------------------+------+-----------+-----------+--------------------+------+
使用DataFrame的Join方法和使用SparkSQL的結果是一樣的。DataFrame除了Jion方法外,還提供了leftOutterJoin和rightOutterJoin關聯資料,其結果與SQL的 left outter join和right outer join是一樣的。
使用SparkSQL能夠快速的關聯多個DataFrame的資料,這對于習慣使用SQL的使用者來說帶來的很大的友善。對于資料的聚合統計,使用SparkSQL能夠減少了很多集合運算的代碼。
SparkSQL具有快速、易用性、通用性和任何平台都可以運作的特點,是以,SparkSQL受到了很多開發者的青睐。
