天天看點

Spark SQL Catalyst源碼分析之SqlParser

    Spark SQL的核心執行流程我們已經分析完畢,可以參見Spark SQL核心執行流程,下面我們來分析執行流程中各個核心元件的工作職責。

    本文先從入口開始分析,即如何解析SQL文本生成邏輯計劃的,主要設計的核心元件式SqlParser是一個SQL語言的解析器,用scala實作的Parser将解析的結果封裝為Catalyst TreeNode ,關于Catalyst這個架構後續文章會介紹。

一、SQL Parser入口

    Sql Parser 其實是封裝了scala.util.parsing.combinator下的諸多Parser,并結合Parser下的一些解析方法,構成了Catalyst的元件UnResolved Logical Plan。

    先來看流程圖:

Spark SQL Catalyst源碼分析之SqlParser

     一段SQL會經過SQL Parser解析生成UnResolved Logical Plan(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)。

    在源代碼裡是:  

def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))//sql("select name,value from temp_shengli") 執行個體化一個SchemaRDD

protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) //執行個體化SqlParser

class SqlParser extends StandardTokenParsers with PackratParsers {

  def apply(input: String): LogicalPlan = {  //傳入sql語句調用apply方法,input參數即sql語句
    // Special-case out set commands since the value fields can be
    // complex to handle without RegexParsers. Also this approach
    // is clearer for the several possible cases of set commands.
    if (input.trim.toLowerCase.startsWith("set")) {
      input.trim.drop(3).split("=", 2).map(_.trim) match {
        case Array("") => // "set"
          SetCommand(None, None)
        case Array(key) => // "set key"
          SetCommand(Some(key), None)
        case Array(key, value) => // "set key=value"
          SetCommand(Some(key), Some(value))
      }
    } else {
      phrase(query)(new lexical.Scanner(input)) match {
        case Success(r, x) => r
        case x => sys.error(x.toString)
      }
    }
  }
           

    1.  當我們調用sql("select name,value from temp_shengli")時,實際上是new了一個SchemaRDD

    2. new SchemaRDD時,構造方法調用parseSql方法,parseSql方法執行個體化了一個SqlParser,這個Parser初始化調用其apply方法。

    3. apply方法分支:

         3.1 如果sql指令是set開頭的就調用SetCommand,這個類似Hive裡的參數設定,SetCommand其實是一個Catalyst裡TreeNode之LeafNode,也是繼承自LogicalPlan,關于Catalyst的TreeNode庫這個暫不詳細介紹,後面會有文章來詳細講解。

         3.2 關鍵是else語句塊裡,才是SqlParser解析SQL的核心代碼:

phrase(query)(new lexical.Scanner(input)) match {
        case Success(r, x) => r
        case x => sys.error(x.toString)
      }
           

        可能 phrase方法大家很陌生,不知道是幹什麼的,那麼我們首先看一下SqlParser的類圖:

Spark SQL Catalyst源碼分析之SqlParser

      SqlParser類繼承了scala内置集合Parsers,這個Parsers。我們可以看到SqlParser現在是具有了分詞的功能,也能解析combiner的語句(類似p ~> q,後面會介紹)。

     Phrase方法:

/** A parser generator delimiting whole phrases (i.e. programs).
   *
   *  `phrase(p)` succeeds if `p` succeeds and no input is left over after `p`.
   *
   *  @param p the parser that must consume all input for the resulting parser
   *           to succeed.
   *  @return  a parser that has the same result as `p`, but that only succeeds
   *           if `p` consumed all the input.
   */
  def phrase[T](p: Parser[T]) = new Parser[T] {
    def apply(in: Input) = lastNoSuccessVar.withValue(None) {
      p(in) match {
      case s @ Success(out, in1) =>
        if (in1.atEnd)
          s
        else
            lastNoSuccessVar.value filterNot { _.next.pos < in1.pos } getOrElse Failure("end of input expected", in1)
        case ns => lastNoSuccessVar.value.getOrElse(ns)
      }
    }
  }
           

      Phrase是一個循環讀取輸入字元的方法,如果輸入in沒有到達最後一個字元,就繼續對parser進行解析,直到最後一個輸入字元。

     我們注意到Success這個類,出現在Parser裡, 在else塊裡最終傳回的也有Success:

/** The success case of `ParseResult`: contains the result and the remaining input.
   *
   *  @param result The parser's output
   *  @param next   The parser's remaining input
   */
  case class Success[+T](result: T, override val next: Input) extends ParseResult[T] {
           

    通過源碼可知,Success封裝了目前解析器的解析結果result, 和還沒有解析的語句。

   是以上面判斷了Success的解析結果中in1.atEnd? 如果輸入流結束了,就傳回s,即Success對象,這個Success包含了SqlParser解析的輸出。

二、Sql Parser核心

在SqlParser裡phrase接受2個參數:

第一個是query,一種帶模式的解析規則,傳回的是LogicalPlan。

第二個是lexical詞彙掃描輸入。

SqlParser parse的流程是,用lexical詞彙掃描接受SQL關鍵字,使用query模式來解析符合規則的SQL。

2.1 lexical keyword

在SqlParser裡定義了KeyWord這個類:

protected case class Keyword(str: String)
           

在我使用的spark1.0.0版本裡目前隻支援了一下SQL保留字:

protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  protected val APPROXIMATE = Keyword("APPROXIMATE")
  protected val AVG = Keyword("AVG")
  protected val BY = Keyword("BY")
  protected val CACHE = Keyword("CACHE")
  protected val CAST = Keyword("CAST")
  protected val COUNT = Keyword("COUNT")
  protected val DESC = Keyword("DESC")
  protected val DISTINCT = Keyword("DISTINCT")
  protected val FALSE = Keyword("FALSE")
  protected val FIRST = Keyword("FIRST")
  protected val FROM = Keyword("FROM")
  protected val FULL = Keyword("FULL")
  protected val GROUP = Keyword("GROUP")
  protected val HAVING = Keyword("HAVING")
  protected val IF = Keyword("IF")
  protected val IN = Keyword("IN")
  protected val INNER = Keyword("INNER")
  protected val INSERT = Keyword("INSERT")
  protected val INTO = Keyword("INTO")
  protected val IS = Keyword("IS")
  protected val JOIN = Keyword("JOIN")
  protected val LEFT = Keyword("LEFT")
  protected val LIMIT = Keyword("LIMIT")
  protected val MAX = Keyword("MAX")
  protected val MIN = Keyword("MIN")
  protected val NOT = Keyword("NOT")
  protected val NULL = Keyword("NULL")
  protected val ON = Keyword("ON")
  protected val OR = Keyword("OR")
  protected val OVERWRITE = Keyword("OVERWRITE")
  protected val LIKE = Keyword("LIKE")
  protected val RLIKE = Keyword("RLIKE")
  protected val UPPER = Keyword("UPPER")
  protected val LOWER = Keyword("LOWER")
  protected val REGEXP = Keyword("REGEXP")
  protected val ORDER = Keyword("ORDER")
  protected val OUTER = Keyword("OUTER")
  protected val RIGHT = Keyword("RIGHT")
  protected val SELECT = Keyword("SELECT")
  protected val SEMI = Keyword("SEMI")
  protected val STRING = Keyword("STRING")
  protected val SUM = Keyword("SUM")
  protected val TABLE = Keyword("TABLE")
  protected val TRUE = Keyword("TRUE")
  protected val UNCACHE = Keyword("UNCACHE")
  protected val UNION = Keyword("UNION")
  protected val WHERE = Keyword("WHERE")
           

這裡根據這些保留字,反射,生成了一個SqlLexical

override val lexical = new SqlLexical(reservedWords)
           

SqlLexical利用它的Scanner這個Parser來讀取輸入,傳遞給query。

2.2 query

query的定義是Parser[LogicalPlan]  和 一堆奇怪的連接配接符(其實都是Parser的方法啦,看上圖),*,~,^^^,看起來很讓人費解。通過查閱讀源碼,以下列出幾個常用的:

|  is the alternation combinator. It says “succeed if either the left or right operand parse successfully” 

左邊算子和右邊的算子隻要有一個成功了,就傳回succeed,類似or

~ is the sequential combinator. It says “succeed if the left operand parses successfully, and then the right parses successfully on the remaining input”

左邊的算子成功後,右邊的算子對後續的輸入也計算成功,就傳回succeed

opt  `opt(p)` is a parser that returns `Some(x)` if `p` returns `x` and `None` if `p` fails.

如果p算子成功則傳回則傳回Some(x) 如果p算子失敗,傳回fails

^^^ `p ^^^ v` succeeds if `p` succeeds; discards its result, and returns `v` instead.

如果左邊的算子成功,取消左邊算子的結果,傳回右邊算子。

~> says “succeed if the left operand parses successfully followed by the right, but do not include the left content in the result”

如果左邊的算子和右邊的算子都成功了,傳回的結果中不包含左邊的傳回值。

  protected lazy val limit: Parser[Expression] =

    LIMIT ~> expression

<~ is the reverse, “succeed if the left operand is parsed successfully followed by the right, but do not include the right content in the result”

這個和~>操作符的意思相反,如果左邊的算子和右邊的算子都成功了,傳回的結果中不包含右邊的

    termExpression <~ IS ~ NOT ~ NULL ^^ { case e => IsNotNull(e) } |

^^{} 或者 ^^=> is the transformation combinator. It says “if the left operand parses successfully, transform the result using the function on the right”

rep => simply says “expect N-many repetitions of parser X” where X is the parser passed as an argument to rep

變形連接配接符,意思是如果左邊的算子成功了,用^^右邊的算子函數作用于傳回的結果

接下來看query的定義:

protected lazy val query: Parser[LogicalPlan] = (
    select * (
        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
        UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
      )
    | insert | cache
  )
           

沒錯,傳回的是一個Parser,裡面的類型是LogicalPlan。 query的定義其實是一種模式,用到了上述的諸多操作符,如|, ^^, ~> 等等 給定一種sql模式,如select,select xxx from yyy where ccc =ddd  如果比對這種寫法,則傳回Success,否則傳回Failure.

這裡的模式是select 模式後面可以接union all 或者 union distinct。 即如下書寫式合法的,否則出錯。  

select a,b from c 
union all
select e,f from g
           

這個 *号是一個repeat符号,即可以支援多個union all 子句。

看來目前spark1.0.0隻支援這三種模式,即select, insert, cache。

那到底是怎麼生成LogicalPlan的呢? 我們再看一個詳細的:

protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> opt(DISTINCT) ~ projections ~
    opt(from) ~ opt(filter) ~
    opt(grouping) ~
    opt(having) ~
    opt(orderBy) ~
    opt(limit) <~ opt(";") ^^ {
      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
        val base = r.getOrElse(NoRelation)
        val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
        val withProjection =
          g.map {g =>
            Aggregate(assignAliases(g), assignAliases(p), withFilter)
          }.getOrElse(Project(assignAliases(p), withFilter))
        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
        val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
        val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
        val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
        withLimit
  }
           

這裡我給稱它為select模式。 看這個select語句支援什麼模式的寫法: select  distinct  projections from filter grouping having orderBy limit. 

給出一個符合的該select 模式的sql, 注意到 帶opt連接配接符的是可選的,可以寫distinct也可以不寫。

select  game_id, user_name from game_log where date<='2014-07-19' and user_name='shengli' group by game_id having game_id > 1 orderBy game_id limit 50.
           

projections是什麼呢? 其實是一個表達式,是一個Seq類型,一連串的表達式可以使 game_id也可以是 game_id AS gmid 。 傳回的确實是一個Expression,是Catalyst裡TreeNode。

protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")

  protected lazy val projection: Parser[Expression] =
    expression ~ (opt(AS) ~> opt(ident)) ^^ {
      case e ~ None => e
      case e ~ Some(a) => Alias(e, a)()
    }
           

模式裡 from是什麼的? 其實是一個relations,就是一個關系,在SQL裡可以是表,表join表

protected lazy val from: Parser[LogicalPlan] = FROM ~> relations
           
protected lazy val relation: Parser[LogicalPlan] =
    joinedRelation |
    relationFactor

  protected lazy val relationFactor: Parser[LogicalPlan] =
    ident ~ (opt(AS) ~> opt(ident)) ^^ {
      case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
    } |
    "(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }

   protected lazy val joinedRelation: Parser[LogicalPlan] =
     relationFactor ~ opt(joinType) ~ JOIN ~ relationFactor ~ opt(joinConditions) ^^ {
      case r1 ~ jt ~ _ ~ r2 ~ cond =>
        Join(r1, r2, joinType = jt.getOrElse(Inner), cond)
     }
           

這裡看出來,其實就是table之間的操作,但是傳回的Subquery确實是一個LogicalPlan

case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
  override def output = child.output.map(_.withQualifiers(alias :: Nil))
  override def references = Set.empty
}
           

scala裡的文法糖很多,這樣寫的确比較友善,但是對初學者可能有點晦澀了。

至此我們知道,SqlParser是怎麼生成LogicalPlan的了。

三、總結

    本文從源代碼剖析了Spark Catalyst 是如何将Sql解析成Unresolved邏輯計劃(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)的。     sql文本作為輸入,執行個體化了SqlParser,SqlParser的apply方法被調用,分别處理2種輸入,一種是指令參數,一種是sql。對應指令參數的會生成一個葉子節點,SetCommand,對于sql語句,會調用Parser的phrase方法,由lexical的Scanner來掃描輸入,分詞,最後由query這個由我們定義好的sql模式利用parser的連接配接符來驗證是否符合sql标準,如果符合則随即生成LogicalPlan文法樹,不符合則會提示解析失敗。     通過對spark catalyst sql parser的解析,使我了解了,sql語言的文法标準是如何實作的和如何解析sql生成邏輯計劃文法樹。

——EOF——

原創文章,轉載請注明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文連結位址:http://blog.csdn.net/oopsoom/article/details/37943507

注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。

Spark SQL Catalyst源碼分析之SqlParser

繼續閱讀