天天看點

[使用SparkSQL操作DataFrame]一、SparkSession二、SparkSQL

    SparkSql 是一種處理結構化模型資料的Spark子產品,它提供了一種叫做DataFrame抽象程式設計,它也可以作為分布式Sql查詢引擎, SparkSql可以從已經安裝的Hive服務中讀取資料,也可以從RDBMS 資料庫中讀取資料。

    在Spark2.0之後,引入了SparkSession新概念。SparkSession實質上是SQLContext和HiveContext的組合,是以在SQLContext和HiveContext上使用用的API在SparkSession上同樣可以使用。SparkSession内部封裝了SparkContext,是以實際上是由SparkContext完成的SparkSession為使用者提供了統一的切入點,來讓使用者學習Spark的各項功能。 

     下面将簡單介紹SparkSession的建立和SparkSQL的使用。

一、SparkSession

        SparkSession的設計遵循了工廠設計模式(factory design pattern),下面的代碼将介紹SparkSession的建立。 

   val conf: SparkConf = new SparkConf()
   conf.setMaster("local[1]").setAppName("UserOrderDataFrameExample") 
   val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
           

        如果我們需要使用SparkContext,可用從SparkSession中擷取:

val sparkContext:SparkContext =sparkSession.SparkContext
           

        建立好SparkSession後,就可以使用SparkSession去建立DataFrame和SparkSQL去操作資料了。在一個Spark應用中,隻能建立一個SparkSesion。

二、SparkSQL

   使用SparkSQL簡單介紹使用者訂單的資料關聯,使用者訂單資料使用程式自動模拟生成。

2.1 定義使用者訂單資料結構

代碼如下:

import java.sql.Timestamp


/**
  * 建立使用者的資料結構
  **/
case class User(userId: Int, userName: String, tel: String)

/**
  * 建立使用者訂單的資料結構
  **/
case class UserOrder(userId: Int, orderId: String)

/**
  * 建立訂單的資料結構
  **/
case class Order(orderId: String, time: Timestamp, money: Double)

/**
  * 使用者訂單資料源
  **/
class UserOrderSource(var users: List[User], var userOrder: List[UserOrder], var orders: List[Order]) {

}
           

2.2 建立使用者及訂單細資訊

代碼如下:

import java.sql.Timestamp
import java.util.UUID

import scala.collection.mutable.ListBuffer

/**
  * 訂單生成器
  **/
object OrderGenerator {
    var tempUserId: Int = 0

    def makerOrder(): UserOrderSource = {
        val users: ListBuffer[User] = new ListBuffer[User]()
        val orders: ListBuffer[Order] = new ListBuffer[Order]
        val userOrders: ListBuffer[UserOrder] = new ListBuffer[UserOrder]()
        var user: User = null
        var order: Order = null
        var userOrder: UserOrder = null
        //建立10個使用者
        for (index <- 1 to 10) {
            user = User(createUserId(), s"UserName-${index}", s"1882345889${index - 1}")
            users += user
            //每個使用者建立3個訂單
            for (num <- 1 to 3) {
                order = Order(createOrderId(), new Timestamp(System.currentTimeMillis()), createOrderMoney())
                userOrder = UserOrder(user.userId, order.orderId)
                orders += order
                userOrders += userOrder
            }
        }

        new UserOrderSource(users.toList, userOrders.toList, orders.toList)
    }

    def createUserId(): Int = {
        this.synchronized {
            tempUserId = tempUserId + 1
            tempUserId
        }
    }

    def createOrderId(): String = {
        val uuid: UUID = UUID.randomUUID()
        uuid.toString()
    }

    def createOrderMoney(): Double = {
        (Math.random() * 100000).toInt / 100d
    }
}
           

2.3 使用SparkSQL操作DataFrame

 代碼如下:

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * 使用者訂單的DataFrame操作
  */
object UserOrderDataFrame {
    var userOrderSource: UserOrderSource = null

    def main(args: Array[String]): Unit = {
        println("this is about user order dataframe for spark example.===================")
        try {
            //建立使用者訂單資料
            userOrderSource = OrderGenerator.makerOrder()
            //初始化Spark
            initSpark()
            dataFrameForSql()
            dataFrameForJoin()
        } catch {
            case e: Exception => {
                e.printStackTrace()
            }
            case e: Throwable => {
                e.printStackTrace()
            }
        } finally {
            SparkSession.builder().getOrCreate().close()
        }
        println("this is one user order dataframe for spark example.===================")
    }

    def initSpark(): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setMaster("local[1]").setAppName("UserOrderDataFrameExample")
        val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    }

    /**
      * 使用SQL操作DataFrame
      */
    def dataFrameForSql(): Unit = {
        val sparkSession: SparkSession = SparkSession.builder().getOrCreate()
        //建立DataFrame
        val dfUsers: DataFrame = sparkSession.createDataFrame[User](userOrderSource.users)
        val dfOrders: DataFrame = sparkSession.createDataFrame[Order](userOrderSource.orders)
        val dfUserOrders: DataFrame = sparkSession.createDataFrame[UserOrder](userOrderSource.userOrder)
        dfUsers.printSchema()
        dfOrders.printSchema()
        dfUserOrders.printSchema()
        //建立臨時表
        dfUsers.createOrReplaceTempView("User")
        dfOrders.createOrReplaceTempView("Order")
        dfUserOrders.createOrReplaceTempView("UserOrder")
        //Spark SQL
        val sql = "SELECT T1.*,T2.*,T3.* FROM User T1 INNER JOIN UserOrder T2 ON T1.userId=T2.userId INNER JOIN Order T3 ON T2.orderId=T3.orderId"
        val dfResult: DataFrame = sparkSession.sql(sql)
        println("SparkSQL join result=================================")
        dfResult.printSchema()
        dfResult.show(100)
    }
}
           

   上述代碼的運作結果如下:

root
 |-- userId: integer (nullable = false)
 |-- userName: string (nullable = true)
 |-- tel: string (nullable = true)

root
 |-- orderId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- money: double (nullable = false)

root
 |-- userId: integer (nullable = false)
 |-- orderId: string (nullable = true)

2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: User
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: Order
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: UserOrder
2018-05-30 18:18:06 INFO SparkSqlParser: Parsing command: SELECT T1.*,T2.*,T3.* FROM User T1 INNER JOIN UserOrder T2 ON T1.userId=T2.userId INNER JOIN Order T3 ON T2.orderId=T3.orderId
SparkSQL join result=================================
root
 |-- userId: integer (nullable = false)
 |-- userName: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- userId: integer (nullable = false)
 |-- orderId: string (nullable = true)
 |-- orderId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- money: double (nullable = false)

+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
|userId|   userName|        tel|userId|             orderId|             orderId|                time| money|
+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
|     1| UserName-1|18823458890|     1|f44be3a5-8e97-47f...|f44be3a5-8e97-47f...|2018-05-30 18:17:...|513.33|
|     1| UserName-1|18823458890|     1|ef516070-acec-4ee...|ef516070-acec-4ee...|2018-05-30 18:17:...|400.59|
|     1| UserName-1|18823458890|     1|ef004d7d-b0d8-4d7...|ef004d7d-b0d8-4d7...|2018-05-30 18:17:...|336.88|
|     2| UserName-2|18823458891|     2|f45b08ff-d04e-408...|f45b08ff-d04e-408...|2018-05-30 18:17:...|693.88|
|     2| UserName-2|18823458891|     2|e2d6fb42-1558-42e...|e2d6fb42-1558-42e...|2018-05-30 18:17:...|278.61|
|     2| UserName-2|18823458891|     2|9bc923d4-528a-4dc...|9bc923d4-528a-4dc...|2018-05-30 18:17:...|719.47|
|     3| UserName-3|18823458892|     3|e0dfd87b-36e5-49f...|e0dfd87b-36e5-49f...|2018-05-30 18:17:...|425.51|
|     3| UserName-3|18823458892|     3|149f0659-12c2-492...|149f0659-12c2-492...|2018-05-30 18:17:...|129.92|
|     3| UserName-3|18823458892|     3|c155a1c8-72b4-4f3...|c155a1c8-72b4-4f3...|2018-05-30 18:17:...|258.57|
|     4| UserName-4|18823458893|     4|54b0c494-a096-4e6...|54b0c494-a096-4e6...|2018-05-30 18:17:...|955.21|
|     4| UserName-4|18823458893|     4|bc6a24b8-6d77-4a5...|bc6a24b8-6d77-4a5...|2018-05-30 18:17:...| 69.42|
|     4| UserName-4|18823458893|     4|b4d68db1-7b02-44d...|b4d68db1-7b02-44d...|2018-05-30 18:17:...|571.32|
|     5| UserName-5|18823458894|     5|a6ccd370-494c-4f2...|a6ccd370-494c-4f2...|2018-05-30 18:17:...| 72.92|
|     5| UserName-5|18823458894|     5|646adb1c-73f1-44e...|646adb1c-73f1-44e...|2018-05-30 18:17:...| 69.54|
|     5| UserName-5|18823458894|     5|532be792-5343-47a...|532be792-5343-47a...|2018-05-30 18:17:...|179.44|
|     6| UserName-6|18823458895|     6|78842349-3c60-486...|78842349-3c60-486...|2018-05-30 18:17:...|111.39|
|     6| UserName-6|18823458895|     6|c4cda44d-42ae-46c...|c4cda44d-42ae-46c...|2018-05-30 18:17:...|111.26|
|     6| UserName-6|18823458895|     6|26b90354-7e46-482...|26b90354-7e46-482...|2018-05-30 18:17:...|336.82|
|     7| UserName-7|18823458896|     7|b0e51c7b-7538-4c6...|b0e51c7b-7538-4c6...|2018-05-30 18:17:...|399.73|
|     7| UserName-7|18823458896|     7|fd8acde2-b115-485...|fd8acde2-b115-485...|2018-05-30 18:17:...|295.53|
|     7| UserName-7|18823458896|     7|2c233d01-59fa-430...|2c233d01-59fa-430...|2018-05-30 18:17:...| 52.69|
|     8| UserName-8|18823458897|     8|a73308fd-f3de-4e4...|a73308fd-f3de-4e4...|2018-05-30 18:17:...| 91.96|
|     8| UserName-8|18823458897|     8|a21deab3-8d88-493...|a21deab3-8d88-493...|2018-05-30 18:17:...|343.63|
|     8| UserName-8|18823458897|     8|25092940-ecde-487...|25092940-ecde-487...|2018-05-30 18:17:...|860.76|
|     9| UserName-9|18823458898|     9|5f2298bf-0859-425...|5f2298bf-0859-425...|2018-05-30 18:17:...|907.78|
|     9| UserName-9|18823458898|     9|cb71a2f9-f973-4ad...|cb71a2f9-f973-4ad...|2018-05-30 18:17:...|666.09|
|     9| UserName-9|18823458898|     9|f64b4ede-7faa-421...|f64b4ede-7faa-421...|2018-05-30 18:17:...|134.23|
|    10|UserName-10|18823458899|    10|2eb50d4e-5230-487...|2eb50d4e-5230-487...|2018-05-30 18:17:...|957.02|
|    10|UserName-10|18823458899|    10|faa13220-d459-4b4...|faa13220-d459-4b4...|2018-05-30 18:17:...|888.55|
|    10|UserName-10|18823458899|    10|8d07cc86-9b13-4d2...|8d07cc86-9b13-4d2...|2018-05-30 18:17:...|228.51|
+------+-----------+-----------+------+--------------------+--------------------+--------------------+------+
           

2.4 使用DataFrame的Join方法冠關聯資料

代碼如下:

/**
      * 使用DataFrame的Join方法連接配接DataFrame
      */
    def dataFrameForJoin(): Unit = {
        val sparkSession: SparkSession = SparkSession.builder().getOrCreate()
        //建立DataFrame
        val dfUsers: DataFrame = sparkSession.createDataFrame[User](userOrderSource.users)
        val dfOrders: DataFrame = sparkSession.createDataFrame[Order](userOrderSource.orders)
        val dfUserOrders: DataFrame = sparkSession.createDataFrame[UserOrder](userOrderSource.userOrder)
        dfUsers.printSchema()
        dfOrders.printSchema()
        dfUserOrders.printSchema()
        val dfResult: DataFrame = dfUsers.join(dfUserOrders, "userId").join(dfOrders, "orderId")
        println("DataFrame join result=================================")
        dfResult.printSchema()
        dfResult.show(100)
    }
           

運作結果:

root
 |-- userId: integer (nullable = false)
 |-- userName: string (nullable = true)
 |-- tel: string (nullable = true)

root
 |-- orderId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- money: double (nullable = false)

root
 |-- userId: integer (nullable = false)
 |-- orderId: string (nullable = true)

DataFrame join result=================================
root
 |-- orderId: string (nullable = true)
 |-- userId: integer (nullable = false)
 |-- userName: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- money: double (nullable = false)

+--------------------+------+-----------+-----------+--------------------+------+
|             orderId|userId|   userName|        tel|                time| money|
+--------------------+------+-----------+-----------+--------------------+------+
|f44be3a5-8e97-47f...|     1| UserName-1|18823458890|2018-05-30 18:17:...|513.33|
|ef516070-acec-4ee...|     1| UserName-1|18823458890|2018-05-30 18:17:...|400.59|
|ef004d7d-b0d8-4d7...|     1| UserName-1|18823458890|2018-05-30 18:17:...|336.88|
|f45b08ff-d04e-408...|     2| UserName-2|18823458891|2018-05-30 18:17:...|693.88|
|e2d6fb42-1558-42e...|     2| UserName-2|18823458891|2018-05-30 18:17:...|278.61|
|9bc923d4-528a-4dc...|     2| UserName-2|18823458891|2018-05-30 18:17:...|719.47|
|e0dfd87b-36e5-49f...|     3| UserName-3|18823458892|2018-05-30 18:17:...|425.51|
|149f0659-12c2-492...|     3| UserName-3|18823458892|2018-05-30 18:17:...|129.92|
|c155a1c8-72b4-4f3...|     3| UserName-3|18823458892|2018-05-30 18:17:...|258.57|
|54b0c494-a096-4e6...|     4| UserName-4|18823458893|2018-05-30 18:17:...|955.21|
|bc6a24b8-6d77-4a5...|     4| UserName-4|18823458893|2018-05-30 18:17:...| 69.42|
|b4d68db1-7b02-44d...|     4| UserName-4|18823458893|2018-05-30 18:17:...|571.32|
|a6ccd370-494c-4f2...|     5| UserName-5|18823458894|2018-05-30 18:17:...| 72.92|
|646adb1c-73f1-44e...|     5| UserName-5|18823458894|2018-05-30 18:17:...| 69.54|
|532be792-5343-47a...|     5| UserName-5|18823458894|2018-05-30 18:17:...|179.44|
|78842349-3c60-486...|     6| UserName-6|18823458895|2018-05-30 18:17:...|111.39|
|c4cda44d-42ae-46c...|     6| UserName-6|18823458895|2018-05-30 18:17:...|111.26|
|26b90354-7e46-482...|     6| UserName-6|18823458895|2018-05-30 18:17:...|336.82|
|b0e51c7b-7538-4c6...|     7| UserName-7|18823458896|2018-05-30 18:17:...|399.73|
|fd8acde2-b115-485...|     7| UserName-7|18823458896|2018-05-30 18:17:...|295.53|
|2c233d01-59fa-430...|     7| UserName-7|18823458896|2018-05-30 18:17:...| 52.69|
|a73308fd-f3de-4e4...|     8| UserName-8|18823458897|2018-05-30 18:17:...| 91.96|
|a21deab3-8d88-493...|     8| UserName-8|18823458897|2018-05-30 18:17:...|343.63|
|25092940-ecde-487...|     8| UserName-8|18823458897|2018-05-30 18:17:...|860.76|
|5f2298bf-0859-425...|     9| UserName-9|18823458898|2018-05-30 18:17:...|907.78|
|cb71a2f9-f973-4ad...|     9| UserName-9|18823458898|2018-05-30 18:17:...|666.09|
|f64b4ede-7faa-421...|     9| UserName-9|18823458898|2018-05-30 18:17:...|134.23|
|2eb50d4e-5230-487...|    10|UserName-10|18823458899|2018-05-30 18:17:...|957.02|
|faa13220-d459-4b4...|    10|UserName-10|18823458899|2018-05-30 18:17:...|888.55|
|8d07cc86-9b13-4d2...|    10|UserName-10|18823458899|2018-05-30 18:17:...|228.51|
+--------------------+------+-----------+-----------+--------------------+------+
           

使用DataFrame的Join方法和使用SparkSQL的結果是一樣的。DataFrame除了Jion方法外,還提供了leftOutterJoin和rightOutterJoin關聯資料,其結果與SQL的 left outter join和right outer join是一樣的。

    使用SparkSQL能夠快速的關聯多個DataFrame的資料,這對于習慣使用SQL的使用者來說帶來的很大的友善。對于資料的聚合統計,使用SparkSQL能夠減少了很多集合運算的代碼。

  SparkSQL具有快速、易用性、通用性和任何平台都可以運作的特點,是以,SparkSQL受到了很多開發者的青睐。

[使用SparkSQL操作DataFrame]一、SparkSession二、SparkSQL

繼續閱讀