1. SparkSQL 是什麼
目标
對于一件事的了解, 應該分為兩個大部分, 第一, 它是什麼, 第二, 它解決了什麼問題
- 了解為什麼會有
SparkSQL
- 了解
所解決的問題, 以及它的使命SparkSQL
1.1. SparkSQL 的出現契機
SparkSQL
是什麼
主線
- 曆史前提
- 發展過程
- 重要性
資料分析的方式
資料分析的方式大緻上可以劃分為
SQL
和 指令式兩種
指令式
在前面的
RDD
部分, 非常明顯可以感覺的到是指令式的, 主要特征是通過一個算子, 可以得到一個結果, 通過結果再進行後續計算.
sc.textFile("...")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect()
指令式的優點
- 操作粒度更細, 能夠控制資料的每一個處理環節
- 操作更明确, 步驟更清晰, 容易維護
- 支援非結構化資料的操作
指令式的缺點
- 需要一定的代碼功底
- 寫起來比較麻煩
SQL
對于一些資料科學家, 要求他們為了做一個非常簡單的查詢, 寫一大堆代碼, 明顯是一件非常殘忍的事情, 是以
SQL on Hadoop
是一個非常重要的方向.
SELECT
name,
age,
school
FROM students
WHERE age > 10
SQL 的優點
- 表達非常清晰, 比如說這段
明顯就是為了查詢三個字段, 又比如說這段SQL
明顯能看到是想查詢年齡大于 10 歲的條目SQL
SQL 的缺點
- 想想一下 3 層嵌套的
, 維護起來應該挺力不從心的吧SQL
- 試想一下, 如果使用
來實作機器學習算法, 也挺為難的吧SQL
SQL
擅長資料分析和通過簡單的文法表示查詢, 指令式操作适合過程式處理和算法性的處理. 在
Spark
出現之前, 對于結構化資料的查詢和處理, 一個工具一向隻能支援
SQL
或者指令式, 使用者被迫要使用多個工具來适應兩種場景, 并且多個工具配合起來比較費勁.
而
Spark
出現了以後, 統一了兩種資料處理範式, 是一種革新性的進步.
因為
SQL
是資料分析領域一個非常重要的範式, 是以
Spark
一直想要支援這種範式, 而伴随着一些決策失誤, 這個過程其實還是非常曲折的
Hive
解決的問題
-
實作了Hive
, 使用SQL on Hadoop
執行任務MapReduce
- 簡化了
任務MapReduce
新的問題
-
的查詢延遲比較高, 原因是使用Hive
做排程MapReduce
Shark
-
改寫Shark
的實體執行計劃, 使用Hive
作業代替Spark
執行實體計劃MapReduce
- 使用列式記憶體存儲
- 以上兩點使得
的查詢效率很高Shark
-
重用了Shark
的Hive
解析, 邏輯計劃生成以及優化, 是以其實可以認為SQL
隻是把Shark
的實體執行替換為了Hive
作業Spark
- 執行計劃的生成嚴重依賴
, 想要增加新的優化非常困難Hive
-
使用Hive
執行作業, 是以MapReduce
是程序級别的并行, 而Hive
是線程級别的并行, 是以Spark
中很多線程不安全的代碼不适用于Hive
Spark
由于以上問題,
Shark
維護了
Hive
的一個分支, 并且無法合并進主線, 難以為繼
SparkSQL
-
Spark SQL
解析Hive
生成SQL
文法樹, 将其後的邏輯計劃生成, 優化, 實體計劃都自己完成, 而不依賴AST
Hive
- 執行計劃和優化交給優化器
Catalyst
- 内建了一套簡單的
解析器, 可以不使用SQL
, 此外, 還引入和HQL
這樣的DataFrame
, 完全可以不依賴任何DSL API
的元件Hive
-
隻能查詢檔案,Shark
可以直接降查詢作用于Spark SQL
, 這一點是一個大進步RDD
對于初期版本的
SparkSQL
, 依然有挺多問題, 例如隻能支援
SQL
的使用, 不能很好的相容指令式, 入口不夠統一等
Dataset
SparkSQL
在 2.0 時代, 增加了一個新的
API
, 叫做
Dataset
,
Dataset
統一和結合了
SQL
的通路和指令式
API
的使用, 這是一個劃時代的進步
在
Dataset
中可以輕易的做到使用
SQL
查詢并且篩選資料, 然後使用指令式
API
進行探索式分析
不隻是一個 引擎, 也包含了一套對 結構化資料的指令式 , 事實上, 所有 中常見的工具, 都是依賴和依照于 設計的 |
總結:
SparkSQL
SparkSQL
是一個為了支援
SQL
而設計的工具, 但同時也支援指令式的
API
2. SparkSQL 的适用場景
SparkSQL
的适用場景
定義 | 特點 | 舉例 | |
---|---|---|---|
結構化資料 | 有固定的 | 有預定義的 | 關系型資料庫的表 |
半結構化資料 | 沒有固定的 , 但是有結構 | , 有結構資訊, 資料一般是自描述的 | 指一些有結構的檔案格式, 例如 |
非結構化資料 | 沒有固定 , 也沒有結構 | | 指文檔圖檔之類的格式 |
一般指資料有固定的
Schema
, 例如在使用者表中,
name
字段是
String
型, 那麼每一條資料的
name
字段值都可以當作
String
來使用
+----+--------------+---------------------------+-------+---------+
| id | name | url | alexa | country |
+----+--------------+---------------------------+-------+---------+
| 1 | Google | https://www.google.cm/ | 1 | USA |
| 2 | 淘寶 | https://www.taobao.com/ | 13 | CN |
| 3 | 菜鳥教程 | http://www.runoob.com/ | 4689 | CN |
| 4 | 微網誌 | http://weibo.com/ | 20 | CN |
| 5 | Facebook | https://www.facebook.com/ | 3 | USA |
+----+--------------+---------------------------+-------+---------+
一般指的是資料沒有固定的
Schema
, 但是資料本身是有結構的
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"phoneNumber":
[
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "fax",
"number": "646 555-4567"
}
]
}
Schema
指的是半結構化資料是沒有固定的
Schema
的, 可以了解為沒有顯式指定
Schema
比如說一個使用者資訊的
JSON
檔案, 第一條資料的
phone_num
有可能是
String
, 第二條資料雖說應該也是
String
, 但是如果硬要指定為
BigInt
, 也是有可能的
因為沒有指定
Schema
, 沒有顯式的強制的限制
有結構
雖說半結構化資料是沒有顯式指定
Schema
的, 也沒有限制, 但是半結構化資料本身是有有隐式的結構的, 也就是資料自身可以描述自身
例如
JSON
檔案, 其中的某一條資料是有字段這個概念的, 每個字段也有類型的概念, 是以說
JSON
是可以描述自身的, 也就是資料本身攜帶有元資訊
SparkSQL
處理什麼資料的問題?
-
Spark
主要用于處理 非結構化資料 和 半結構化資料RDD
-
主要用于處理 結構化資料SparkSQL
SparkSQL
相較于
RDD
的優勢在哪?
-
提供了更好的外部資料源讀寫支援SparkSQL
- 因為大部分外部資料源是有結構化的, 需要在
之外有一個新的解決方案, 來整合這些結構化資料源RDD
- 因為大部分外部資料源是有結構化的, 需要在
-
提供了直接通路列的能力SparkSQL
-
主要用做于處理結構化資料, 是以其提供的SparkSQL
具有一些普通資料庫的能力API
-
SparkSQL
适用于什麼場景?
SparkSQL
适用于處理結構化資料的場景
本章總結
-
是一個即支援SparkSQL
又支援指令式資料處理的工具SQL
-
的主要适用場景是處理結構化資料SparkSQL
3. SparkSQL 初體驗
- 了解
SparkSQL
由哪些部分組成API
3.1. RDD 版本的 WordCount
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect
-
版本的代碼有一個非常明顯的特點, 就是它所處理的資料是基本類型的, 在算子中對整個資料進行處理RDD
3.2. 指令式 API 的入門案例
case class People(name: String, age: Int)
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val peopleRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 9), People("lisi", 15)))
val peopleDS: Dataset[People] = peopleRDD.toDS()
val teenagers: Dataset[String] = peopleDS.where('age > 10)
.where('age < 20)
.select('name)
.as[String]
/*
+----+
|name|
+----+
|lisi|
+----+
*/
teenagers.show()
SparkSQL 中有一個新的入口點, 叫做 SparkSession |
SparkSQL 中有一個新的類型叫做 Dataset |
SparkSQL 有能力直接通過字段名通路資料集, 說明 SparkSQL 的 API 中是攜帶 Schema 資訊的 |
SparkSession
SparkContext
作為
RDD
的建立者和入口, 其主要作用有如下兩點
- 建立
, 主要是通過讀取檔案建立RDD
RDD
- 監控和排程任務, 包含了一系列元件, 例如
DAGScheduler
TaskSheduler
為什麼無法使用
SparkContext
SparkSQL
的入口?
-
在讀取檔案的時候, 是不包含SparkContext
資訊的, 因為讀取出來的是Schema
RDD
-
在整合資料源如SparkContext
Cassandra
JSON
等的時候是不靈活的, 而Parquet
和DataFrame
一開始的設計目标就是要支援更多的資料源Dataset
-
的排程方式是直接排程SparkContext
, 但是一般情況下針對結構化資料的通路, 會先通過優化器優化一下RDD
是以
SparkContext
确實已經不适合作為
SparkSQL
的入口, 是以剛開始的時候
Spark
團隊為
SparkSQL
設計了兩個入口點, 一個是
SQLContext
對應
Spark
标準的
SQL
執行, 另外一個是
HiveContext
HiveSQL
的執行和
Hive
的支援.
Spark 2.0
的時候, 為了解決入口點不統一的問題, 建立了一個新的入口點
SparkSession
, 作為整個
Spark
生态工具的統一入口點, 包括了
SQLContext
HiveContext
SparkContext
等元件的功能
新的入口應該有什麼特性?
- 能夠整合
SQLContext
HiveContext
SparkContext
等不同的入口點StreamingContext
- 為了支援更多的資料源, 應該完善讀取和寫入體系
- 同時對于原來的入口點也不能放棄, 要向下相容
DataFrame & Dataset
SparkSQL
最大的特點就是它針對于結構化資料設計, 是以
SparkSQL
應該是能支援針對某一個字段的通路的, 而這種通路方式有一個前提, 就是
SparkSQL
的資料集中, 要 包含結構化資訊, 也就是俗稱的
Schema
SparkSQL
對外提供的
API
有兩類, 一類是直接執行
SQL
, 另外一類就是指令式.
SparkSQL
提供的指令式
API
就是
DataFrame
Dataset
, 暫時也可以認為
DataFrame
Dataset
, 隻是在不同的
API
中傳回的是
Dataset
的不同表現形式
// RDD
rdd.map { case Person(id, name, age) => (age, 1) }
.reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}
// DataFrame
df.groupBy("age").count("age")
通過上面的代碼, 可以清晰的看到,
SparkSQL
的指令式操作相比于
RDD
來說, 可以直接通過
Schema
資訊來通路其中某個字段, 非常的友善
4. SQL 版本 WordCount
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val peopleRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 9), People("lisi", 15)))
val peopleDS: Dataset[People] = peopleRDD.toDS()
peopleDS.createOrReplaceTempView("people")
val teenagers: DataFrame = spark.sql("select name from people where age > 10 and age < 20")
/*
+----+
|name|
+----+
|lisi|
+----+
*/
teenagers.show()
以往使用
SQL
肯定是要有一個表的, 在
Spark
中, 并不存在表的概念, 但是有一個近似的概念, 叫做
DataFrame
, 是以一般情況下要先通過
DataFrame
或者
Dataset
注冊一張臨時表, 然後使用
SQL
操作這張臨時表