天天看點

Spark SQL學習Spark SQL學習

文章目錄

  • Spark SQL學習
    • 1.SparkSQL概述
      • 1.1.特點
        • 1.1.1.Integrated(可內建的)
        • 1.1.2.Uniform Data Access(統一的資料通路方式)
        • 1.1.3.Hive Integration(可內建Hive)
        • 1.1.4.Standard Connectivity(标準連接配接方式)
    • 2.SparkSQL初體驗
      • 2.1.SparkSession
      • 2.2.基本體驗操作
    • 3.程式設計模型
      • 3.1.DataFrame
      • 3.2.Dataset
    • 4.SparkSQL程式設計
      • 4.1.加載依賴
      • 4.2.程式入口——SparkSession
      • 4.3.DataFrame基本操作
      • 4.4.SQL基本操作
    • 5.RDD/list和DataFrame的轉換
      • 5.1.使用反射方式進行轉化
        • 5.1.1.List-2-dataframe
        • 5.1.2.Rdd-2-dataframe
      • 5.2.使用動态程式設計的方式進行轉化
        • 5.2.1.List-2-dataframe
        • 5.2.2.Rdd-2-dataframe
        • 5.3.Dataframe-2-rdd
    • 6.List/RDD和Dataset的轉化
        • 6.1.1.List-2-dataset
        • 6.1.2.Rdd-2-dataset
        • 6.1.3.List/RDD-2-dataset
        • 6.1.4.其它轉化
    • 7.SparkSQL資料源和落地方式
      • 7.1.各種各樣的資料源
      • 7.2.各種各樣的落地方式
    • 8.SparkSQL和Hive的整合
    • 9.SparkSQL之函數操作
      • 9.1.常用函數
        • 9.1.1.按照功能做分類
        • 9.1.2.按照參數分類
        • 9.1.3.練習
      • 9.2.自定義函數
        • 9.2.1.自定義UDF
        • 9.2.2.自定義UDAF
      • 9.3.特殊函數
    • 10.SparkSQL和SparkCore整合案例
    • 11.SparkSQL以及wordcount的資料傾斜處理
      • 11.1.SparkSQL編寫wordcount
      • 11.2.解決groupBy産生的資料傾斜

Spark SQL學習

1.SparkSQL概述

Spark SQL學習Spark SQL學習

SparkSQL是能夠操作結構化資料的spark中的module子產品。

1.1.特點

1.1.1.Integrated(可內建的)

SparkSQL可以和Spark的程式混合在一起使用。

Spark SQL學習Spark SQL學習

1.1.2.Uniform Data Access(統一的資料通路方式)

Connect to any data source the same way.

DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.

SparkSQL能夠對各種各樣的資料提供一個統一的資料通路方式。

Spark SQL學習Spark SQL學習

1.1.3.Hive Integration(可內建Hive)

Run SQL or HiveQL queries on existing warehouses.

Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses.

1.1.4.Standard Connectivity(标準連接配接方式)

Connect through JDBC or ODBC.

A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools.

能夠提供标準的jdbc和odbc作為用戶端的連接配接方式,類似于mysql-server/hiveserver2.

一句話總結:

SparkSQL就是建構在SparkCore基礎之上的能夠使用SQL去操作結構化資料的Spark的module子產品。

2.SparkSQL初體驗

Spark-shell來體驗sparksql

2.1.SparkSession

統一的程式設計入口

Spark SQL學習Spark SQL學習

2.2.基本體驗操作

Spark SQL學習Spark SQL學習
Spark SQL學習Spark SQL學習

3.程式設計模型

SparkSQL的主要程式設計模型,就是SQL、DataFrame以及Dataset。

我們把RDD稱為Spark的第一代程式設計模型,DataFrame稱為第二代程式設計模型,Dataset稱為第三代程式設計模型。

3.1.DataFrame

DataFrame相比較于RDD,就比RDD多了一行schema描述資訊,這個schema可以了解為表頭等中繼資料資訊。

官方解釋:

Spark SQL學習Spark SQL學習

一句話總結:DataFrame就是結構化資料庫中的一張二維表。

3.2.Dataset

Dataset是RDD和DataFrame的集大成者,內建二者優點(RDD中的強類型推斷,強大的lambda表達式和DataFrame中的執行引擎的優化的能力)

官方解釋:

Spark SQL學習Spark SQL學習

一句話:Dataset=RDD+DataFrame

總結:相比 DataFrame,Dataset 提供了編譯時類型檢查,對于分布式程式來講,送出一次作業太費勁了(要編譯、打包、上傳、運作),到送出到叢集運作時才發現錯誤,影響開發進度,這也是引入 Dataset 的一個重要原因。

4.SparkSQL程式設計

4.1.加載依賴

<!-- sparksql-->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
<!--sparksql-hive-->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
           

4.2.程式入口——SparkSession

Spark2.x以前,sparkSQL的程式入口——SQLContext,如果要想操作Hive,就使用HiveContext,Spark2.x之後,SparkSQL的入口就統一切換到SparkSession中來了。

Spark SQL學習Spark SQL學習

4.3.DataFrame基本操作

Spark SQL學習Spark SQL學習

4.4.SQL基本操作

Spark SQL學習Spark SQL學習

5.RDD/list和DataFrame的轉換

5.1.使用反射方式進行轉化

5.1.1.List-2-dataframe

Spark SQL學習Spark SQL學習

5.1.2.Rdd-2-dataframe

Spark SQL學習Spark SQL學習

5.2.使用動态程式設計的方式進行轉化

5.2.1.List-2-dataframe

Spark SQL學習Spark SQL學習

5.2.2.Rdd-2-dataframe

Spark SQL學習Spark SQL學習

5.3.Dataframe-2-rdd

Spark SQL學習Spark SQL學習

6.List/RDD和Dataset的轉化

6.1.1.List-2-dataset

Spark SQL學習Spark SQL學習

修改正确之後:

Spark SQL學習Spark SQL學習

6.1.2.Rdd-2-dataset

Spark SQL學習Spark SQL學習

6.1.3.List/RDD-2-dataset

Spark SQL學習Spark SQL學習

但是必須要引入對應的spark的隐士轉化

Spark SQL學習Spark SQL學習

6.1.4.其它轉化

Spark SQL學習Spark SQL學習

7.SparkSQL資料源和落地方式

7.1.各種各樣的資料源

使用預設的方式加載資料

Spark SQL學習Spark SQL學習

預設加載的檔案格式為parquet===》

Spark SQL學習Spark SQL學習

讀取指定格式的檔案

Spark SQL學習Spark SQL學習

7.2.各種各樣的落地方式

Spark SQL學習Spark SQL學習

===>jdbc

Spark SQL學習Spark SQL學習

8.SparkSQL和Hive的整合

/*
* SparkSQL和Hive的整合操作 
* 需求:
*     在hive資料庫db-1808中有兩張表
*         teacher_info
*             tname, height
*             zhangsan,175
*         teacher_basic
*             tname,age,married,children
*             zhangsan,23,false,0
*     需要完成的操作是:
*         1、使用sparksql建立資料db_1808
*         2、在db_1808中建立兩張表teacher_info和teacher_basic,
*             并加載對應的資料
*         3、對這兩張表進行關聯查詢,将最終結果存儲到hive中的上述第一步建立的資料庫中,表名為teacher
*          create table if not exists teacher as
*          select
*             b.tname,
*             b.age,
*             b.married,
*             b.children,
*             i.height
*          from teacher_info i
*          left join teacher_basic b on i.tname = b.tname;
*
*          insert <into|overwrite> table teacher
*          select
*             b.tname,
*             b.age,
*             b.married,
*             b.children,
*             i.height
*          from teacher_info i
*          left join teacher_basic b on i.tname = b.tname;
* /
           

第一步:建構Spark和Hive的整合,就需要在SparkSession中添加支援Hive.

Spark SQL學習Spark SQL學習

第二步:便可展開hive的操作,就想在hive終端執行操作一樣。

// 1、使用sparksql建立資料db_1808
spark.sql("create database `db_1808`")
spark.sql("use `db_1808`")
// 2 在db_1808中建立兩張表teacher_info和teacher_basic,并加載對應的資料
spark.sql(
    """
      |create table if not exists `db_1808`.`teacher_info` (
      |  tname string,
      |  height double
      |) row format delimited
      |fields terminated by ','
    """.stripMargin)
spark.sql("load data inpath 'hdfs://ns1/data/teacher_info.txt' into table `db_1808`.`teacher_info`")
spark.sql(
    """
      |create table if not exists `db_1808`.`teacher_basic` (
      |  tname string,
      |  age int,
      |  married boolean,
      |  children int
      |) row format delimited
      |fields terminated by ','
    """.stripMargin)
spark.sql("load data inpath 'hdfs://ns1/data/teacher_basic.txt' into table `db_1808`.`teacher_basic`")
//3 對這兩張表進行關聯查詢,将最終結果存儲到hive中的上述第一步建立的資料庫中,表名為teacher
val sql =
    """
      |select
      |   b.tname,
      |   b.age,
      |   b.married,
      |   b.children,
      |   i.height
      |from teacher_info i
      |left join teacher_basic b on i.tname = b.tname
    """.stripMargin
val retDF = spark.sql(sql)
retDF.write.mode(SaveMode.Overwrite).saveAsTable("`db_1808`.`teacher`")

//最後在spark安裝執行個體中添加hive的中繼資料資訊:
           

1、将hive/conf目錄下面的hive-site.xml配置檔案拷貝到spark的conf目錄下面

2、将mysql的依賴jar拷貝到spark的jars目錄下面

Spark-submit.sh

Spark SQL學習Spark SQL學習

9.SparkSQL之函數操作

什麼是函數?

函數就是為了完成某一個比較複雜,或者常用功能,對一些列代碼的封裝。

9.1.常用函數

9.1.1.按照功能做分類

數學函數:sin(),abs(),cos(),tan()

統計函數:sum(),avg(),count(),max()

日期函數:date_sub(date1, date2),year(),hour(),

字元串函數:length,substr

開窗函數:row_number() over(),sum() over()

9.1.2.按照參數分類

  • UDF(User Definition Function)

    使用者自定義函數

    一路輸入,一路輸出

    sin,year

  • UDAF(User Definition Aggregation Function)

    使用者自定義聚合函數

    多路輸入,一路輸出

    max,sum,…,

  • UDTF(User Definition Table Function)

    使用者自定義表函數

    一路輸入,多路輸出

    explode

這裡面的“路”,指的就是一行中的某一列。

9.1.3.練習

資料源:person

Spark SQL學習Spark SQL學習

Us_population

Spark SQL學習Spark SQL學習

加載資料的工具類:

Spark SQL學習Spark SQL學習

統計函數操作:

Spark SQL學習Spark SQL學習
Spark SQL學習Spark SQL學習

日期操作:

val pDF: DataFrame = loadTable(spark, "person")
pDF.createOrReplaceTempView("person")
pDF.printSchema()
pDF.show()
/*
    1、統計每年出生的人數
    2、計算李濤和鐘甯靜相差多少天
    3、計算129天之後的鐘甯靜同學芳齡幾何
 */
var sql =
    """
      |select
      |     year(birthday) year,
      |     count(1) num
      |from person
      |group by year(birthday)
    """.stripMargin
spark.sql(sql).show()

sql =
    """
      |SELECT
      |   DATEDIFF(p1.birthday, p2.birthday) birthday_diff
      |FROM person p1 CROSS JOIN person p2
      |WHERE p1.name='李濤'
      |AND p2.name='鐘甯靜'
    """.stripMargin
spark.sql(sql).show()

sql =
    """
      |select
      |  datediff(b1, b2)
      |from (
          |select
          |(SELECT
          |   birthday
          |from person where name ='李濤') b1,
          |(SELECT
          |   birthday b1
          |from person where name ='鐘甯靜') b2
      |) t
    """.stripMargin
spark.sql(sql).show()

/**
  * 計算129天之後的鐘甯靜同學芳齡幾何
  * 年齡=目前的年份-出生的年份
  *     這裡的目前的年份就是129天之後的年份
  *         date_add(current_date(), 129) 129天之後的日期
  *         year(date_add(current_date(), 129)) 129天之後的年份
  *     出生的年份就是birthday中的year
  */
sql =
    """
      |select
      | name,
      | birthday,
      | date_add(current_date(), 129) date_129_later,
      | year(date_add(current_date(), 129)) year,
      | (year(date_add(current_date(), 129)) - year(birthday)) age
      |from person
      |where name ='鐘甯靜'
    """.stripMargin
spark.sql(sql).show()
           

9.2.自定義函數

當系統函數,滿足不了業務需要的時候,就需要自定義函數,完成自定義函數,有如下三個步驟。

1、實作某個類,複寫其中的方法

2、注冊到環境中

3、直接使用

9.2.1.自定義UDF

/*
 *        自己模拟字元串長度的函數length
 *        1、編寫一個方法即可
 *        2、注冊
 *        3、使用
 * /
def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  Logger.getLogger("org.spark-project").setLevel(Level.WARN)
    val spark = SparkSession.builder()
        .appName("_02SparkSQLUDFOps")
        .master("local[*]")
        .getOrCreate()
    // 2、注冊(注冊之後的函數名,函數的引用)
    spark.udf.register[Int, String]("strLen", str => myStrLen(str))
    val jdbcDF = loadTable(spark, "us_population")
    jdbcDF.createOrReplaceTempView("us_population")
    jdbcDF.printSchema()
    jdbcDF.show()
    //3 使用
    val sql =
        """
          |select
          | city,
          | length(city) city_len,
          | strLen(city) city_len
          |from us_population
        """.stripMargin
    spark.sql(sql).show()

    spark.stop()
}
// 1、編寫一個方法即可
private def myStrLen(str:String) = str.length

private def loadTable(spark: SparkSession, table:String) = {
    val url = "jdbc:mysql://localhost:3306/test"
    val properties = new Properties()
    properties.put("user", "root")
    properties.put("password", "sorry")
    val jdbcDF = spark.read.jdbc(url, table, properties)
    jdbcDF
}
           

9.2.2.自定義UDAF

1、實作某個類,複寫其中的方法

/**
  * 學習這個UserDefinedAggregateFunction類,大家一定要參考在SparkCore中學習過的
  * combineByKey和aggregateByKey
  */
class MySum extends UserDefinedAggregateFunction {
    //該udaf的輸入參數的schema類型
    override def inputSchema: StructType = {
        StructType(
            List(StructField("population", DataTypes.IntegerType, false))
        )
    }

    /*
        目前UDAF傳回值的資料類型
     */
    override def dataType: DataType = DataTypes.IntegerType

     /*
      * 聚合過程中的緩沖區域的資料類型Schema
      */
    override def bufferSchema: StructType = {
        StructType(
            List(StructField("population", DataTypes.IntegerType, false))
        )
    }
    //輸入輸出确定性判斷,比如,輸入輸出類型是一緻或者确定,傳回為true
    override def deterministic: Boolean = true
    /*
        初始化的操作
        相當于aggregateByKey中的createCombiner方法
     */
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer.update(0, 0)
    }

    /**
      *  分區内的合并操作
      * @param buffer   initialize建構的臨時緩沖區,用于記錄合并結果
      * @param input    就是每次新輸入的要聚合的值
      * partition-1
      * var sum1 = 0 //initialize
      * for(i <- 0 until 10) { //i 就是每次新輸入的要聚合的值
      *     sum1 += i //update
      * }
      *
      *partition-2
      * var sum2 = 0 //initialize
      * for(i <- 0 until 10) {
      *     sum2 = sum2 + i //update
      * }
      */
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val oldValue= buffer.getInt(0)
        val newValue = input.getInt(0)
        val mergeValue = oldValue + newValue
        buffer.update(0, mergeValue)
    }

    /**
      * 分區間的合并操作
      * @param buffer1 分區一中聚合得到的臨時結果1
      * @param buffer2 分區二中聚合得到的臨時結果2
      * partition-1
      * var sum1 = 0 //initialize
      * for(i <- 0 until 10) { //i 就是每次新輸入的要聚合的值
      *     sum1 += i //update
      * }
      *
      *partition-2
      * var sum2 = 0 //initialize
      * for(i <- 0 until 10) {
      *     sum2 = sum2 + i //update
      * }
      *
      * sum1+sum2
      */
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val old = buffer1.getInt(0)
        val newV = buffer2.getInt(0)
        val totalV = old + newV
        buffer1.update(0, totalV)
    }

    //擷取目前聚合操作的最終結果值
    override def evaluate(buffer: Row): Any = buffer.getInt(0)
}
           

2、注冊到環境中

3、直接使用

object _03SparkSQLUDAFOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("_03SparkSQLUDAFOps")
            .master("local[*]")
            .getOrCreate()
        //2 注冊
        spark.udf.register("mySum", new MySum)
        val jdbcDF = loadTable(spark, "us_population")
        jdbcDF.createOrReplaceTempView("us_population")
        jdbcDF.printSchema()
        jdbcDF.show()
        //3 使用
        val sql =
            """
              |select
              | state,
              | sum(population) total_population,
              | mySum(population) m_total_population
              |from us_population
              |group by state
            """.stripMargin
        spark.sql(sql).show()

        spark.stop()
    }

    private def loadTable(spark: SparkSession, table:String) = {
        val url = "jdbc:mysql://localhost:3306/test"
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "sorry")
        val jdbcDF = spark.read.jdbc(url, table, properties)
        jdbcDF
    }
}
           

9.3.特殊函數

開窗函數row_number()\sum() over()\max() over(),好處,直接在字段上進行統計,這樣就不需要在表後面再用group by的。

Row_number

**
  *  http://www.bejson.com/otherformat/sql/
  * SparkSQL之開窗函數操作
  * row_number() over()
  *     分組topn
  *     course  name    score
  *     chinese ls 91
        english ww 56
        chinese zs 90
        chinese zl 76
        english zq 88
        chinese wb 95
        chinese sj 74
        english ts 87
        english ys 67
        english mz 77
        chinese yj 98
        english gk 96
    求取各科目成績的前三名

  * sum() over()
  */
object _04SparkSQLWindowOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("_04SparkSQLWindowOps")
            .master("local[*]")
            .getOrCreate()

        val linesDF = spark.read.text("file:///E:/data/spark/topn.txt")
        linesDF.printSchema()
        linesDF.show()
        import spark.implicits._
        val stuDS:Dataset[Student] = linesDF.map(row => {
            val line = row.getString(0)
            val fields = line.split("\\s+")
            val course = fields(0)
            val name = fields(1)
            val score = fields(2).trim.toInt
            Student(course, name, score)
        })

        stuDS.show()
        stuDS.createOrReplaceTempView("student")//注冊成為臨時表
        var sql =
            """
              |select
              |    course,
              |    name,
              |    score,
              |    row_number() over(partition by course order by score desc) rank
              |from student
              |having rank < 4
            """.stripMargin
        spark.sql(sql).show()

        //建議大家使用子查詢代替having操作
        println("建議大家使用子查詢代替having操作")
        sql =
            """
              |select
              | tmp.*
              |from(
              |select
              |    course,
              |    name,
              |    score,
              |    row_number() over(partition by course order by score desc) rank
              |from student) tmp
              |where tmp.rank < 4
            """.stripMargin
        spark.sql(sql).show()
        spark.stop()
    }
}
case class Student(course:String, name:String, score:Int)

Sum() over()
/**
  *  http://www.bejson.com/otherformat/sql/
  * SparkSQL之開窗函數操作
  * row_number() over()
  * sum()/max()/min/avg over()
  *     可以實作累計求和的過程
  */
object _05SparkSQLWindowOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("_05SparkSQLWindowOps")
            .master("local[*]")
            .getOrCreate()

        val jsonDF = spark.read.json("file:///E:/data/spark/sql/product_info.json")
        jsonDF.printSchema()
        jsonDF.show()
        jsonDF.createOrReplaceTempView("product")
        val sql =
            """
              |select
              |   product_code,
              |   event_date,
              |   duration,
              |   sum(duration) over(partition by product_code order by event_date) `group_sum`,
              |   sum(duration) over(order by event_date) `sum`
              |from product
            """.stripMargin
        spark.sql(sql).show()
        spark.stop()
    }
}
           

10.SparkSQL和SparkCore整合案例

/**
  * SparkSQL+SparkCore的整合案例
  *  基礎資料:
  *     date        name    keyword province  client  searchType
  *     2018-11-13 tom    china  beijing    pc web
  * 2018-11-13 tom    news   tianjing   pc web
  *  需求:
  *     統計每天使用者檢索關鍵字的Top3,要求:每個使用者在每一天對同一個關鍵字的多次檢索,隻能統計一次。
  *
  *  分析思路:
  *     根據題意,其實就是分組求topN,這裡的分組,按照什麼分組?按照date進行分組,每一天有若幹個
  *     單詞的統計,如果統計關鍵字?
  *      為了避免關鍵字跨天被重複統計,那麼就需要使用日期+關鍵字作為符合key,來求出每一天的關鍵字出現的次數。
  *      是以在統計過程中,要對同一個使用者檢索的關鍵字進行去重
  *      1、将該使用者在某一天的所有的關鍵字都拉取過來,然後去重,
  *      2、以date+keyword+user作為rdd内容(distinct),第二步,資料轉化為<date+keyword, 1> 統計
  *      便可計算出某一天的某一個關鍵字的檢索量。
  *
  *      檢索出每一天每個關鍵字的量,之後便分組topN(row_number搞定即可)
  *
  */
object _06SparkSQLAndCoreTest {
    def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
            Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
            Logger.getLogger("org.spark-project").setLevel(Level.WARN)
            val spark = SparkSession.builder()
                .appName("_06SparkSQLAndCoreTest")
                .master("local[*]")
                .getOrCreate()

            import spark.implicits._
            val lines = spark.sparkContext.textFile("file:///E:/data/spark/sql/dailykey.txt")

            // date        name    keyword province  client  searchType
//            *     2018-11-13 tom    china  beijing    pc web
            //date+keyword+user
            val baseRDD:RDD[String] = lines.map(line => {
                    val fields = line.split("\\s+")
                    val date = fields(0)
                    val user = fields(1)
                    val keyword = fields(2)
                    s"${date}|${keyword}|${user}"
            })
            println("========>原始資料經過轉化之後的内容")
            baseRDD.foreach(println)
            println("=========>去重之後的結果=====================")
            val distinctRDD:RDD[String] = baseRDD.distinct()
            distinctRDD.foreach(println)
            println("=========>統計每天每個關鍵字被檢索的次數=====================")
            val keyCount:RDD[(String, Int)] = distinctRDD.map(fhkey => {
                    val key = fhkey.substring(0, fhkey.lastIndexOf("|"))
                    (key, 1)
            }).reduceByKey(_ + _)
            keyCount.foreach(println)
            println("=========>每天關鍵字檢索Top3=====================")
            val finalRDD = keyCount.map { case (key, count) => {
                    val fields = key.split("\\|")
                    val date = fields(0)
                    val keyword = fields(1)
                    MyRow(date, keyword, count)
            }}

            val ds:Dataset[MyRow] = finalRDD.toDS()
            ds.createOrReplaceTempView("daily_keyword_tmp")
            ds.printSchema()
            val sql =
                """
                  |select
                  | tmp.*
                  |from (
                  |select
                  |  `date`,
                  |  keyword,
                  |  `count`,
                  |  row_number() over(partition by `date` order by `count` desc) rank
                  |from daily_keyword_tmp
                  |) tmp
                  |where tmp.rank < 4
                """.stripMargin


            spark.sql(sql).show()
            spark.stop()
    }
}
case class MyRow(date:String, keyword:String, count:Int)
           

11.SparkSQL以及wordcount的資料傾斜處理

11.1.SparkSQL編寫wordcount

/**
  * 使用SparkSQL來統計wordcount
  */
object _07SparkSQLWordCountOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("_07SparkSQLWordCountOps")
            .master("local[*]")
            .getOrCreate()
        val linesDF = spark.read.text("file:///E:/data/spark/core/hello.txt").toDF("line")
        linesDF.createOrReplaceTempView("test")
        linesDF.printSchema()
        linesDF.show()

        //求wordcount
        //step 1、将每一行的資料進行拆分
        println(">>>>step 1、将每一行的資料進行拆分")
        val flatMapSQL =
            """
              |select
              | split(line, '\\s+') words
              |from test
            """.stripMargin
        spark.sql(flatMapSQL).show()
        //step 2、強一個數組轉化為為多行,使用explode函數
        println(">>>>step 2、強一個數組轉化為為多行,使用explode函數")
        val explodeSQL =
            """
              |select
              | explode(split(line, '\\s+')) word
              |from test
            """.stripMargin
        spark.sql(explodeSQL).show()
        //step 3、分組統計
        val groupSQL =
            """
              |select
              |  tmp.word,
              |  count(tmp.word) as countz
              |from (
              | select
              |     explode(split(line, '\\s+')) word
              | from test
              |) tmp
              |group by tmp.word
              |order by countz desc
            """.stripMargin
        spark.sql(groupSQL).show()
        spark.stop()
    }
}
           

11.2.解決groupBy産生的資料傾斜

/**
  * 使用SparkSQL來解決group by操作的資料傾斜
  * 在SparkCore中,使用兩階段聚合來解決dataskew,
  *     局部聚合+全局聚合
  *     對key拆分打散(添加随機字首),在此基礎之上做group by的統計---->局部聚合
  *     在局部聚合的基礎之上,去掉對應的随機字首,再做group by的統計--->全局聚合
  *  雙重group-by
  */
object _08SparkSQLWordCountDataSkewOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("_08SparkSQLWordCountDataSkewOps")
            .master("local[*]")
            .getOrCreate()
        val linesDF = spark.read.text("file:///E:/data/spark/core/hello.txt").toDF("line")
        linesDF.createOrReplaceTempView("test")
        linesDF.printSchema()
        linesDF.show()

        //求wordcount
        //step 1、将每一行的資料進行拆分
        println(">>>>step 1、将每一行的資料進行拆分")
        val flatMapSQL =
            """
              |select
              | split(line, '\\s+') words
              |from test
            """.stripMargin
        spark.sql(flatMapSQL).show()
        //step 2、強一個數組轉化為為多行,使用explode函數
        println(">>>>step 2、強一個數組轉化為為多行,使用explode函數")
        val explodeSQL =
            """
              |select
              |   explode(split(line, '\\s+')) word
              |from test
            """.stripMargin
        spark.sql(explodeSQL).show()
        //對拆分出來的每一個單詞添加随機字首,添加4以内的随機字首
        println("step 3 對拆分出來的每一個單詞添加随機字首,添加4以内的随機字首")
        val prefixSQL =
            """
              |select
              |  concat_ws("_", cast(floor(rand(10) * 2) as string), t1.word) as prefix_word
              |from (
              |  select
              |    explode(split(line, '\\s+')) word
              |  from test
              |) t1
            """.stripMargin
        spark.sql(prefixSQL).show()
        println(">>>>>step 4在添加好字首之後做局部聚合")
        val partAggrSQL =
            """
              |select
              | t2.prefix_word,
              | count(t2.prefix_word) as prefix_count
              |from (
              | select
              |  concat_ws("_", cast(floor(rand(10) * 2) as string), t1.word) as prefix_word
              | from (
              |  select
              |    explode(split(line, '\\s+')) word
              |  from test
              | ) t1
              |) t2
              |group by t2.prefix_word
            """.stripMargin
        spark.sql(partAggrSQL).show()
        println(">>>在局部聚合的基礎至少去除字首")
        /*
            去除字首的兩種方式:
                1、select substr("1_baidu", instr("1_baidu", "_") + 1);索引法
                2、select split("1_baidu", "_")[1]; 切割法
         */
        val removePrefixSQL =
            """
              |select
              | t2.prefix_word,
              | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) index_m,
              | split(t2.prefix_word, "_")[1] split_m,
              | count(t2.prefix_word) as prefix_count
              |from (
              | select
              |  concat_ws("_", cast(floor(rand(10) * 2) as string), t1.word) as prefix_word
              | from (
              |  select
              |    explode(split(line, '\\s+')) word
              |  from test
              | ) t1
              |) t2
              |group by t2.prefix_word
            """.stripMargin
        spark.sql(removePrefixSQL).show()
        println(">>>>全局聚合統計")
        val fullAggrSQL =
            """
              |select
              |  t3.index_m,
              |  sum(t3.prefix_count) as countz
              |from (
              |select
              | t2.prefix_word,
              | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) index_m,
              | split(t2.prefix_word, "_")[1] split_m,
              | count(t2.prefix_word) as prefix_count
              |from (
              | select
              |  concat_ws("_", cast(floor(rand(10) * 2) as string), t1.word) as prefix_word
              | from (
              |  select
              |    explode(split(line, '\\s+')) word
              |  from test
              | ) t1
              |) t2
              |group by t2.prefix_word)
              |t3
              |group by t3.index_m
            """.stripMargin

        spark.sql(fullAggrSQL).show()
        spark.stop()
    }

    private def addPrefix(str:String) = {
        val random = new Random()
        random.nextInt(2) + "_" + str
    }

    private def removePrefix(str:String) = {
        str.substring(str.indexOf("_") + 1)
    }
}
           

繼續閱讀