SQL的貢獻者從幾人到了幾十人,而且發展速度異常迅猛,究其原因,個人認為有以下2點:
1、整合:将SQL類型的查詢語言整合到 Spark 的核心RDD概念裡。這樣可以應用于多種任務,流處理,批處理,包括機器學習裡都可以引入Sql。
2、效率:因為Shark受到hive的程式設計模型限制,無法再繼續優化來适應Spark模型裡。
先來看一段簡單的Spark SQL程式:
1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2. import sqlContext._
3.case class Person(name: String, age: Int)
4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
5.people.registerAsTable("people")
6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
程式前兩句1和2生成SQLContext,導入sqlContext下面的all,也就是運作SparkSQL的上下文環境。
程式3,4兩句是加載資料源注冊table
第6句是真正的入口,是sql函數,傳入一句sql,先會傳回一個SchemaRDD。這一步是lazy的,直到第七句的collect這個action執行時,sql才會執行。
SQLContext是執行SQL的上下文對象,首先來看一下它Hold的有哪些成員:
一個存儲<tableName,logicalPlan>的map結構,查找關系的目錄,系統資料庫,登出表,查詢表和邏輯計劃關系的類。

Parse 傳入的sql來對文法分詞,建構文法樹,傳回一個logical plan
logical plan的文法分析器
logical Plan的優化器
邏輯計劃,由catalyst的TreeNode組成,可以看到有3種文法樹
包含不同政策的優化政策來優化實體執行計劃
sql執行的環境上下文
就是這些對象組成了Spark SQL的運作時,看起來很酷,有靜态的metadata存儲,有分析器、優化器、邏輯計劃、實體計劃、執行運作時。
那這些對象是怎麼互相協作來執行sql語句的呢?
話不多說,先上圖,這個圖我用一個線上作圖工具process on話的,畫的不好,圖能達意就行:
核心元件都是綠色的方框,每一步流程的結果都是藍色的框框,調用的方法是橙色的框框。
先概括一下,大緻的執行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD
更具體的執行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark
plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD
回到開始的程式,我們調用sql函數,其實是SQLContext裡的sql函數它的實作是new一個SchemaRDD,在生成的時候就調用parseSql方法了。
/**
* Executes a SQL query using Spark, returning the result as a SchemaRDD.
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
結果是會生成一個邏輯計劃
@transient
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)
當我們調用SchemaRDD裡面的collect方法時,則會初始化QueryExecution,開始啟動執行。
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
我們可以很清晰的看到執行步驟:
protected abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = analyzer(logical) //首先分析器會分析邏輯計劃
lazy val optimizedPlan = optimizer(analyzed) //随後優化器去優化分析後的邏輯計劃
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next() //根據政策生成plan實體計劃
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最後生成已經準備好的Spark Plan
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute() //最後調用toRDD方法執行任務将結果轉換為RDD
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String = stringOrError(executedPlan)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
}
至此整個流程結束。
通過分析SQLContext我們知道了Spark SQL都包含了哪些元件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.
通過調試代碼,知道了Spark SQL的執行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies
to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD
随後還會對裡面的每個元件對象進行研究,看看catalyst究竟做了哪些優化。