天天看點

spark系列14:sparksql入門與介紹使用

1. SparkSQL 是什麼

目标

對于一件事的了解, 應該分為兩個大部分, 第一, 它是什麼, 第二, 它解決了什麼問題

  1. 了解為什麼會有 

    SparkSQL

  2. 了解 

    SparkSQL

     所解決的問題, 以及它的使命

1.1. SparkSQL 的出現契機

SparkSQL

 是什麼

主線

  1. 曆史前提
  2. 發展過程
  3. 重要性

資料分析的方式

資料分析的方式大緻上可以劃分為 

SQL

 和 指令式兩種

指令式

在前面的 

RDD

 部分, 非常明顯可以感覺的到是指令式的, 主要特征是通過一個算子, 可以得到一個結果, 通過結果再進行後續計算.

sc.textFile("...")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .collect()           

指令式的優點

  • 操作粒度更細, 能夠控制資料的每一個處理環節
  • 操作更明确, 步驟更清晰, 容易維護
  • 支援非結構化資料的操作

指令式的缺點

  • 需要一定的代碼功底
  • 寫起來比較麻煩

SQL

對于一些資料科學家, 要求他們為了做一個非常簡單的查詢, 寫一大堆代碼, 明顯是一件非常殘忍的事情, 是以 

SQL on Hadoop

 是一個非常重要的方向.

SELECT
	name,
	age,
	school
FROM students
WHERE age > 10           

SQL 的優點

  • 表達非常清晰, 比如說這段 

    SQL

     明顯就是為了查詢三個字段, 又比如說這段 

    SQL

     明顯能看到是想查詢年齡大于 10 歲的條目

SQL 的缺點

  • 想想一下 3 層嵌套的 

    SQL

    , 維護起來應該挺力不從心的吧
  • 試想一下, 如果使用 

    SQL

     來實作機器學習算法, 也挺為難的吧

SQL

 擅長資料分析和通過簡單的文法表示查詢, 指令式操作适合過程式處理和算法性的處理. 在 

Spark

 出現之前, 對于結構化資料的查詢和處理, 一個工具一向隻能支援 

SQL

 或者指令式, 使用者被迫要使用多個工具來适應兩種場景, 并且多個工具配合起來比較費勁.

而 

Spark

 出現了以後, 統一了兩種資料處理範式, 是一種革新性的進步.

因為 

SQL

 是資料分析領域一個非常重要的範式, 是以 

Spark

 一直想要支援這種範式, 而伴随着一些決策失誤, 這個過程其實還是非常曲折的

spark系列14:sparksql入門與介紹使用

Hive

解決的問題

  • Hive

     實作了 

    SQL on Hadoop

    , 使用 

    MapReduce

     執行任務
  • 簡化了 

    MapReduce

     任務

新的問題

  • Hive

     的查詢延遲比較高, 原因是使用 

    MapReduce

     做排程

Shark

  • Shark

     改寫 

    Hive

     的實體執行計劃, 使用 

    Spark

     作業代替 

    MapReduce

     執行實體計劃
  • 使用列式記憶體存儲
  • 以上兩點使得 

    Shark

     的查詢效率很高
  • Shark

     重用了 

    Hive

     的 

    SQL

     解析, 邏輯計劃生成以及優化, 是以其實可以認為 

    Shark

     隻是把 

    Hive

     的實體執行替換為了 

    Spark

     作業
  • 執行計劃的生成嚴重依賴 

    Hive

    , 想要增加新的優化非常困難
  • Hive

     使用 

    MapReduce

     執行作業, 是以 

    Hive

     是程序級别的并行, 而 

    Spark

     是線程級别的并行, 是以 

    Hive

     中很多線程不安全的代碼不适用于 

    Spark

由于以上問題, 

Shark

 維護了 

Hive

 的一個分支, 并且無法合并進主線, 難以為繼

SparkSQL

  • Spark SQL

    Hive

     解析 

    SQL

     生成 

    AST

     文法樹, 将其後的邏輯計劃生成, 優化, 實體計劃都自己完成, 而不依賴 

    Hive

  • 執行計劃和優化交給優化器 

    Catalyst

  • 内建了一套簡單的 

    SQL

     解析器, 可以不使用 

    HQL

    , 此外, 還引入和 

    DataFrame

     這樣的 

    DSL API

    , 完全可以不依賴任何 

    Hive

     的元件
  • Shark

     隻能查詢檔案, 

    Spark SQL

     可以直接降查詢作用于 

    RDD

    , 這一點是一個大進步

對于初期版本的 

SparkSQL

, 依然有挺多問題, 例如隻能支援 

SQL

 的使用, 不能很好的相容指令式, 入口不夠統一等

Dataset

SparkSQL

 在 2.0 時代, 增加了一個新的 

API

, 叫做 

Dataset

Dataset

 統一和結合了 

SQL

 的通路和指令式 

API

 的使用, 這是一個劃時代的進步

在 

Dataset

 中可以輕易的做到使用 

SQL

 查詢并且篩選資料, 然後使用指令式 

API

 進行探索式分析

spark系列14:sparksql入門與介紹使用

SparkSQL

 不隻是一個 

SQL

 引擎, 

SparkSQL

 也包含了一套對 結構化資料的指令式 

API

, 事實上, 所有 

Spark

 中常見的工具, 都是依賴和依照于 

SparkSQL

API

 設計的

總結: 

SparkSQL

SparkSQL

 是一個為了支援 

SQL

 而設計的工具, 但同時也支援指令式的 

API

2. SparkSQL 的适用場景

SparkSQL

 的适用場景

定義 特點 舉例
結構化資料 有固定的 

Schema

有預定義的 

Schema

關系型資料庫的表
半結構化資料 沒有固定的 

Schema

, 但是有結構

Schema

, 有結構資訊, 資料一般是自描述的
指一些有結構的檔案格式, 例如 

JSON

非結構化資料 沒有固定 

Schema

, 也沒有結構

Schema

指文檔圖檔之類的格式

一般指資料有固定的 

Schema

, 例如在使用者表中, 

name

 字段是 

String

 型, 那麼每一條資料的 

name

 字段值都可以當作 

String

 來使用

+----+--------------+---------------------------+-------+---------+
| id | name         | url                       | alexa | country |
+----+--------------+---------------------------+-------+---------+
| 1  | Google       | https://www.google.cm/    | 1     | USA     |
| 2  | 淘寶          | https://www.taobao.com/   | 13    | CN      |
| 3  | 菜鳥教程      | http://www.runoob.com/    | 4689  | CN      |
| 4  | 微網誌          | http://weibo.com/         | 20    | CN      |
| 5  | Facebook     | https://www.facebook.com/ | 3     | USA     |
+----+--------------+---------------------------+-------+---------+           

一般指的是資料沒有固定的 

Schema

, 但是資料本身是有結構的

{
     "firstName": "John",
     "lastName": "Smith",
     "age": 25,
     "phoneNumber":
     [
         {
           "type": "home",
           "number": "212 555-1234"
         },
         {
           "type": "fax",
           "number": "646 555-4567"
         }
     ]
 }           

Schema

指的是半結構化資料是沒有固定的 

Schema

 的, 可以了解為沒有顯式指定 

Schema

比如說一個使用者資訊的 

JSON

 檔案, 第一條資料的 

phone_num

 有可能是 

String

, 第二條資料雖說應該也是 

String

, 但是如果硬要指定為 

BigInt

, 也是有可能的

因為沒有指定 

Schema

, 沒有顯式的強制的限制

有結構

雖說半結構化資料是沒有顯式指定 

Schema

 的, 也沒有限制, 但是半結構化資料本身是有有隐式的結構的, 也就是資料自身可以描述自身

例如 

JSON

 檔案, 其中的某一條資料是有字段這個概念的, 每個字段也有類型的概念, 是以說 

JSON

 是可以描述自身的, 也就是資料本身攜帶有元資訊

SparkSQL

 處理什麼資料的問題?

  • Spark

    RDD

     主要用于處理 非結構化資料 和 半結構化資料
  • SparkSQL

     主要用于處理 結構化資料

SparkSQL

 相較于 

RDD

 的優勢在哪?

  • SparkSQL

     提供了更好的外部資料源讀寫支援
    • 因為大部分外部資料源是有結構化的, 需要在 

      RDD

       之外有一個新的解決方案, 來整合這些結構化資料源
  • SparkSQL

     提供了直接通路列的能力
    • SparkSQL

       主要用做于處理結構化資料, 是以其提供的 

      API

       具有一些普通資料庫的能力

SparkSQL

 适用于什麼場景?

SparkSQL

 适用于處理結構化資料的場景

本章總結

  • SparkSQL

     是一個即支援 

    SQL

     又支援指令式資料處理的工具
  • SparkSQL

     的主要适用場景是處理結構化資料

3. SparkSQL 初體驗

  1. 了解 

    SparkSQL

    API

     由哪些部分組成

3.1. RDD 版本的 WordCount

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .collect           
  • RDD

     版本的代碼有一個非常明顯的特點, 就是它所處理的資料是基本類型的, 在算子中對整個資料進行處理

3.2. 指令式 API 的入門案例

case class People(name: String, age: Int)

val spark: SparkSession = new sql.SparkSession.Builder()       
  .appName("hello")
  .master("local[6]")
  .getOrCreate()

import spark.implicits._

val peopleRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 9), People("lisi", 15)))
val peopleDS: Dataset[People] = peopleRDD.toDS()               
val teenagers: Dataset[String] = peopleDS.where('age > 10)     
  .where('age < 20)
  .select('name)
  .as[String]

/*
+----+
|name|
+----+
|lisi|
+----+
*/
teenagers.show()           
SparkSQL 中有一個新的入口點, 叫做 SparkSession
SparkSQL 中有一個新的類型叫做 Dataset
SparkSQL 有能力直接通過字段名通路資料集, 說明 SparkSQL 的 API 中是攜帶 Schema 資訊的

SparkSession

SparkContext

 作為 

RDD

 的建立者和入口, 其主要作用有如下兩點

  • 建立 

    RDD

    , 主要是通過讀取檔案建立 

    RDD

  • 監控和排程任務, 包含了一系列元件, 例如 

    DAGScheduler

    TaskSheduler

為什麼無法使用 

SparkContext

SparkSQL

 的入口?

  • SparkContext

     在讀取檔案的時候, 是不包含 

    Schema

     資訊的, 因為讀取出來的是 

    RDD

  • SparkContext

     在整合資料源如 

    Cassandra

    JSON

    Parquet

     等的時候是不靈活的, 而 

    DataFrame

     和 

    Dataset

     一開始的設計目标就是要支援更多的資料源
  • SparkContext

     的排程方式是直接排程 

    RDD

    , 但是一般情況下針對結構化資料的通路, 會先通過優化器優化一下

是以 

SparkContext

 确實已經不适合作為 

SparkSQL

 的入口, 是以剛開始的時候 

Spark

 團隊為 

SparkSQL

 設計了兩個入口點, 一個是 

SQLContext

 對應 

Spark

 标準的 

SQL

 執行, 另外一個是 

HiveContext

HiveSQL

 的執行和 

Hive

 的支援.

Spark 2.0

 的時候, 為了解決入口點不統一的問題, 建立了一個新的入口點 

SparkSession

, 作為整個 

Spark

 生态工具的統一入口點, 包括了 

SQLContext

HiveContext

SparkContext

 等元件的功能

新的入口應該有什麼特性?

  • 能夠整合 

    SQLContext

    HiveContext

    SparkContext

    StreamingContext

     等不同的入口點
  • 為了支援更多的資料源, 應該完善讀取和寫入體系
  • 同時對于原來的入口點也不能放棄, 要向下相容

DataFrame & Dataset

spark系列14:sparksql入門與介紹使用

SparkSQL

 最大的特點就是它針對于結構化資料設計, 是以 

SparkSQL

 應該是能支援針對某一個字段的通路的, 而這種通路方式有一個前提, 就是 

SparkSQL

 的資料集中, 要 包含結構化資訊, 也就是俗稱的 

Schema

SparkSQL

 對外提供的 

API

 有兩類, 一類是直接執行 

SQL

, 另外一類就是指令式. 

SparkSQL

 提供的指令式 

API

 就是 

DataFrame

Dataset

, 暫時也可以認為 

DataFrame

Dataset

, 隻是在不同的 

API

 中傳回的是 

Dataset

 的不同表現形式

// RDD
rdd.map { case Person(id, name, age) => (age, 1) }
  .reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}

// DataFrame
df.groupBy("age").count("age")           

通過上面的代碼, 可以清晰的看到, 

SparkSQL

 的指令式操作相比于 

RDD

 來說, 可以直接通過 

Schema

 資訊來通路其中某個字段, 非常的友善

4. SQL 版本 WordCount

val spark: SparkSession = new sql.SparkSession.Builder()
  .appName("hello")
  .master("local[6]")
  .getOrCreate()

import spark.implicits._

val peopleRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 9), People("lisi", 15)))
val peopleDS: Dataset[People] = peopleRDD.toDS()
peopleDS.createOrReplaceTempView("people")

val teenagers: DataFrame = spark.sql("select name from people where age > 10 and age < 20")

/*
+----+
|name|
+----+
|lisi|
+----+
 */
teenagers.show()           

以往使用 

SQL

 肯定是要有一個表的, 在 

Spark

 中, 并不存在表的概念, 但是有一個近似的概念, 叫做 

DataFrame

, 是以一般情況下要先通過 

DataFrame

 或者 

Dataset

 注冊一張臨時表, 然後使用 

SQL

 操作這張臨時表