天天看點

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

SQL的貢獻者從幾人到了幾十人,而且發展速度異常迅猛,究其原因,個人認為有以下2點:

    1、整合:将SQL類型的查詢語言整合到 Spark 的核心RDD概念裡。這樣可以應用于多種任務,流處理,批處理,包括機器學習裡都可以引入Sql。

    2、效率:因為Shark受到hive的程式設計模型限制,無法再繼續優化來适應Spark模型裡。

先來看一段簡單的Spark SQL程式:

1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  

2. import sqlContext._  

3.case class Person(name: String, age: Int)  

4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))  

5.people.registerAsTable("people")  

6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")  

7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)  

程式前兩句1和2生成SQLContext,導入sqlContext下面的all,也就是運作SparkSQL的上下文環境。

程式3,4兩句是加載資料源注冊table

第6句是真正的入口,是sql函數,傳入一句sql,先會傳回一個SchemaRDD。這一步是lazy的,直到第七句的collect這個action執行時,sql才會執行。

SQLContext是執行SQL的上下文對象,首先來看一下它Hold的有哪些成員:

 一個存儲<tableName,logicalPlan>的map結構,查找關系的目錄,系統資料庫,登出表,查詢表和邏輯計劃關系的類。

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

 Parse 傳入的sql來對文法分詞,建構文法樹,傳回一個logical plan

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

  logical plan的文法分析器

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

 logical Plan的優化器

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

邏輯計劃,由catalyst的TreeNode組成,可以看到有3種文法樹

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

包含不同政策的優化政策來優化實體執行計劃

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

sql執行的環境上下文

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

就是這些對象組成了Spark SQL的運作時,看起來很酷,有靜态的metadata存儲,有分析器、優化器、邏輯計劃、實體計劃、執行運作時。

那這些對象是怎麼互相協作來執行sql語句的呢?

話不多說,先上圖,這個圖我用一個線上作圖工具process on話的,畫的不好,圖能達意就行:

Spark-SparkSQL深入學習系列一(轉自OopsOutOfMemory)

核心元件都是綠色的方框,每一步流程的結果都是藍色的框框,調用的方法是橙色的框框。

先概括一下,大緻的執行流程是:

Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

更具體的執行流程:

     sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark

plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD

 回到開始的程式,我們調用sql函數,其實是SQLContext裡的sql函數它的實作是new一個SchemaRDD,在生成的時候就調用parseSql方法了。

/** 

* Executes a SQL query using Spark, returning the result as a SchemaRDD. 

* @group userf 

*/  

def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))  

   結果是會生成一個邏輯計劃

 @transient  

protected[sql] val parser = new catalyst.SqlParser    

protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)  

當我們調用SchemaRDD裡面的collect方法時,則會初始化QueryExecution,開始啟動執行。

override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()  

我們可以很清晰的看到執行步驟:

protected abstract class QueryExecution {  

    def logical: LogicalPlan  

    lazy val analyzed = analyzer(logical)  //首先分析器會分析邏輯計劃  

    lazy val optimizedPlan = optimizer(analyzed) //随後優化器去優化分析後的邏輯計劃  

    // TODO: Don't just pick the first one...  

    lazy val sparkPlan = planner(optimizedPlan).next() //根據政策生成plan實體計劃  

    // executedPlan should not be used to initialize any SparkPlan. It should be  

    // only used for execution.  

    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最後生成已經準備好的Spark Plan  

    /** Internal version of the RDD. Avoids copies and has no schema */  

    lazy val toRdd: RDD[Row] = executedPlan.execute() //最後調用toRDD方法執行任務将結果轉換為RDD  

    protected def stringOrError[A](f: => A): String =  

      try f.toString catch { case e: Throwable => e.toString }  

    def simpleString: String = stringOrError(executedPlan)  

    override def toString: String =  

      s"""== Logical Plan ==  

         |${stringOrError(analyzed)}  

         |== Optimized Logical Plan ==  

         |${stringOrError(optimizedPlan)}  

         |== Physical Plan ==  

         |${stringOrError(executedPlan)}  

      """.stripMargin.trim  

  }  

至此整個流程結束。

  通過分析SQLContext我們知道了Spark SQL都包含了哪些元件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.

  通過調試代碼,知道了Spark SQL的執行流程:

sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies

to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD

  随後還會對裡面的每個元件對象進行研究,看看catalyst究竟做了哪些優化。