天天看點

Spark SQL Catalyst源碼分析之Optimizer

  前幾篇文章介紹了Spark SQL的Catalyst的核心運作流程、SqlParser,和Analyzer 以及核心類庫TreeNode,本文将詳細講解Spark SQL的Optimizer的優化思想以及Optimizer在Catalyst裡的表現方式,并加上自己的實踐,對Optimizer有一個直覺的認識。

  Optimizer的主要職責是将Analyzer給Resolved的Logical Plan根據不同的優化政策Batch,來對文法樹進行優化,優化邏輯計劃節點(Logical Plan)以及表達式(Expression),也是轉換成實體執行計劃的前置。如下圖:

Spark SQL Catalyst源碼分析之Optimizer

一、Optimizer

  Optimizer這個類是在catalyst裡的optimizer包下的唯一一個類,Optimizer的工作方式其實類似Analyzer,因為它們都繼承自RuleExecutor[LogicalPlan],都是執行一系列的Batch操作:

Spark SQL Catalyst源碼分析之Optimizer

  Optimizer裡的batches包含了3類優化政策:1、Combine Limits 合并Limits  2、ConstantFolding 常量合并 3、Filter Pushdown 過濾器下推,每個Batch裡定義的優化伴随對象都定義在Optimizer裡了:

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
    Batch("ConstantFolding", FixedPoint(100),
      NullPropagation,
      ConstantFolding,
      BooleanSimplification,
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions) ::
    Batch("Filter Pushdown", FixedPoint(100),
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughJoin,
      ColumnPruning) :: Nil
}
           

  另外提一點,Optimizer裡不但對Logical Plan進行了優化,而且對Logical Plan中的Expression也進行了優化,是以有必要了解一下Expression相關類,主要是用到了references和outputSet,references主要是Logical Plan或Expression節點的所依賴的那些Expressions,而outputSet是Logical Plan所有的Attribute的輸出:

  如:Aggregate是一個Logical Plan, 它的references就是group by的表達式 和 aggreagate的表達式的并集去重。

case class Aggregate(
    groupingExpressions: Seq[Expression],
    aggregateExpressions: Seq[NamedExpression],
    child: LogicalPlan)
  extends UnaryNode {

  override def output = aggregateExpressions.map(_.toAttribute)
  override def references =
    (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}
           
Spark SQL Catalyst源碼分析之Optimizer

二、優化政策詳解

  Optimizer的優化政策不僅有對plan進行transform的,也有對expression進行transform的,究其原理就是周遊樹,然後應用優化的Rule,但是注意一點,對Logical Plantransfrom的是先序周遊(pre-order),而對Expression transfrom的時候是後序周遊(post-order):

2.1、Batch: Combine Limits

如果出現了2個Limit,則将2個Limit合并為一個,這個要求一個Limit是另一個Limit的grandChild。

/**
 * Combines two adjacent [[Limit]] operators into one, merging the
 * expressions into one single expression.
 */
object CombineLimits extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll為目前Limit,le為其expression, nl是ll的grandChild,ne是nl的expression
      Limit(If(LessThan(ne, le), ne, le), grandChild) //expression比較,如果ne比le小則表達式為ne,否則為le
  }
}
           

給定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ") 

scala> query.queryExecution.analyzed
res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Limit 10
 Project [key#13,value#14]
  Limit 100
   Project [key#13,value#14]
    MetastoreRelation default, temp_shengli, None
           

子查詢裡limit100,外層查詢limit10,這裡我們當然可以在子查詢裡不必查那麼多,因為外層隻需要10個,是以這裡會合并Limit10,和Limit100 為 Limit 10。

2.2、Batch: ConstantFolding

  這個Batch裡包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。

2.2.1、Rule:NullPropagation

  這裡先提一下Literal字面量,它其實是一個能比對任意基本類型的類。(為下文做鋪墊)

object Literal {
  def apply(v: Any): Literal = v match {
    case i: Int => Literal(i, IntegerType)
    case l: Long => Literal(l, LongType)
    case d: Double => Literal(d, DoubleType)
    case f: Float => Literal(f, FloatType)
    case b: Byte => Literal(b, ByteType)
    case s: Short => Literal(s, ShortType)
    case s: String => Literal(s, StringType)
    case b: Boolean => Literal(b, BooleanType)
    case d: BigDecimal => Literal(d, DecimalType)
    case t: Timestamp => Literal(t, TimestampType)
    case a: Array[Byte] => Literal(a, BinaryType)
    case null => Literal(null, NullType)
  }
}
           

  注意Literal是一個LeafExpression,核心方法是eval,給定Row,計算表達式傳回值:

case class Literal(value: Any, dataType: DataType) extends LeafExpression {
  override def foldable = true
  def nullable = value == null
  def references = Set.empty
  override def toString = if (value != null) value.toString else "null"
  type EvaluatedType = Any
  override def eval(input: Row):Any = value
}
           

  現在來看一下NullPropagation都做了什麼。

  NullPropagation是一個能将Expression Expressions替換為等價的Literal值的優化,并且能夠避免NULL值在SQL文法樹的傳播。

/**
 * Replaces [[Expression Expressions]] that can be statically evaluated with
 * equivalent [[Literal]] values. This rule is more specific with
 * Null value propagation from bottom to top of the expression tree.
 */
object NullPropagation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //如果count(null)則轉化為count(0)
      case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//如果sum(null)則轉化為sum(0)</span>
      case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)
      case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
      case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
      case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
      case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ Coalesce(children) => {
        val newChildren = children.filter(c => c match {
          case Literal(null, _) => false
          case _ => true
        })
        if (newChildren.length == 0) {
          Literal(null, e.dataType)
        } else if (newChildren.length == 1) {
          newChildren(0)
        } else {
          Coalesce(newChildren)
        }
      }
      case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValue
      case e @ In(Literal(v, _), list) if (list.exists(c => c match {
          case Literal(candidate, _) if candidate == v => true
          case _ => false
        })) => Literal(true, BooleanType)
      // Put exceptional cases above if any
      case e: BinaryArithmetic => e.children match {
        case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
        case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
        case _ => e
      }
      case e: BinaryComparison => e.children match {
        case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
        case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
        case _ => e
      }
      case e: StringRegexExpression => e.children match {
        case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
        case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
        case _ => e
      }
    }
  }
}
           

給定SQL: val query = sql("select count(null) from temp_shengli where key is not null")

scala> query.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Aggregate [], [COUNT(null) AS c0#5L] //這裡count的是null
 Filter IS NOT NULL key#7
  MetastoreRelation default, temp_shengli, None
           

調用NullPropagation

scala> NullPropagation(query.queryExecution.analyzed)
res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Aggregate [], [CAST(0, LongType) AS c0#5L]  //優化後為0了
 Filter IS NOT NULL key#7
  MetastoreRelation default, temp_shengli, None
           

2.2.2、Rule:ConstantFolding 

  常量合并是屬于Expression優化的一種,對于可以直接計算的常量,不用放到實體執行裡去生成對象來計算了,直接可以在計劃裡就計算出來:

object ConstantFolding extends Rule[LogicalPlan] {
      def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先對plan進行transform
        case q: LogicalPlan => q transformExpressionsDown { //對每個plan的expression進行transform
          // Skip redundant folding of literals.
          case l: Literal => l
          case e if e.foldable => Literal(e.eval(null), e.dataType) //調用eval方法計算結果
        }
      }
    }
           

給定SQL: val query = sql("select 1+2+3+4 from temp_shengli")

scala> query.queryExecution.analyzed
res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [(((1 + 2) + 3) + 4) AS c0#21]  //這裡還是常量表達式
 MetastoreRelation default, src, None
           

優化後:

scala> query.queryExecution.optimizedPlan
res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [10 AS c0#21] //優化後,直接合并為10
 MetastoreRelation default, src, None
           

2.2.3、BooleanSimplification

 這個是對布爾表達式的優化,有點像java布爾表達式中的短路判斷,不過這個寫的倒是很優雅。

 看看布爾表達式2邊能不能通過隻計算1邊,而省去計算另一邊而提高效率,稱為簡化布爾表達式。

 解釋請看我寫的注釋:

/**
 * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
 * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
 * is only safe when evaluations of expressions does not result in side effects.
 */
object BooleanSimplification extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case and @ And(left, right) => //如果布爾表達式是AND操作,即exp1 and exp2
        (left, right) match { //(左邊表達式,右邊表達式)
          case (Literal(true, BooleanType), r) => r // 左邊true,傳回右邊的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>
          case (l, Literal(true, BooleanType)) => l //右邊true,傳回左邊的bool值
          case (Literal(false, BooleanType), _) => Literal(false)//左邊都false,右邊随便,反正是傳回false
          case (_, Literal(false, BooleanType)) => Literal(false)//隻要有1邊是false了,都是false
          case (_, _) => and
        }

      case or @ Or(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), _) => Literal(true) //隻要左邊是true了,不用判斷右邊都是true
          case (_, Literal(true, BooleanType)) => Literal(true) //隻要有一邊是true,都傳回true
          case (Literal(false, BooleanType), r) => r //希望右邊r是true
          case (l, Literal(false, BooleanType)) => l
          case (_, _) => or
        }
    }
  }
}
           

2.3 Batch: Filter Pushdown

Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning Ps:感覺Filter Pushdown的名字起的有點不能涵蓋全部比如ColumnPruning列裁剪。

2.3.1、Combine Filters

 合并兩個相鄰的Filter,這個和上述Combine Limit差不多。合并2個節點,就可以減少樹的深度進而減少重複執行過濾的代價。

/**
 * Combines two adjacent [[Filter]] operators into one, merging the
 * conditions into one conjunctive predicate.
 */
object CombineFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
  }
}
           

給定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ") 

優化前:我們看到一個filter 是另一個filter的grandChild

scala> query.queryExecution.analyzed
res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [key#27]
 Filter (key#27 > 80) //filter>80
  Project [key#27]
   Filter (key#27 > 100) //filter>100
    MetastoreRelation default, src, None
           

優化後:其實filter也可以表達為一個複雜的boolean表達式

scala> query.queryExecution.optimizedPlan
res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [key#27]
 Filter ((key#27 > 100) && (key#27 > 80)) //合并為1個
  MetastoreRelation default, src, None
           

2.3.2  Filter Pushdown 

  Filter Pushdown,過濾器下推。

  原理就是更早的過濾掉不需要的元素來減少開銷。

  給定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")

  生成的邏輯計劃為:

scala> scala> query.queryExecution.analyzed
res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [key#31]
 Filter (key#31 > 100) //先select key, value,然後再Filter
  Project [key#31,value#32]
   MetastoreRelation default, src, None
           

 優化後的計劃為:

query.queryExecution.optimizedPlan
res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [key#31]
 Filter (key#31 > 100) //先filter,然後再select
  MetastoreRelation default, src, None
           

2.3.3、ColumnPruning

  列裁剪用的比較多,就是減少不必要select的某些列。   列裁剪在3種地方可以用:   1、在聚合操作中,可以做列裁剪   2、在join操作中,左右孩子可以做列裁剪   3、合并相鄰的Project的列

object ColumnPruning extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // Eliminate attributes that are not needed to calculate the specified aggregates.
    case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => 如果project的outputSet中減去a.references的元素如果不同,那麼就将Aggreagte的child替換為a.references
      a.copy(child = Project(a.references.toSeq, child))

    // Eliminate unneeded attributes from either side of a Join.
    case Project(projectList, Join(left, right, joinType, condition)) =>// 消除join的left 和 right孩子的不必要屬性,将join的左右子樹的列進行裁剪
      // Collect the list of off references required either above or to evaluate the condition.
      val allReferences: Set[Attribute] =
        projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)

      /** Applies a projection only when the child is producing unnecessary attributes */
      def prunedChild(c: LogicalPlan) =
        if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
          Project(allReferences.filter(c.outputSet.contains).toSeq, c)
        } else {
          c
        }
      Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

    // Combine adjacent Projects.
    case Project(projectList1, Project(projectList2, child)) => //合并相鄰Project的列
      // Create a map of Aliases to their values from the child projection.
      // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
      val aliasMap = projectList2.collect {
        case a @ Alias(e, _) => (a.toAttribute: Expression, a)
      }.toMap

      // Substitute any attributes that are produced by the child projection, so that we safely
      // eliminate it.
      // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
      // TODO: Fix TransformBase to avoid the cast below.
      val substitutedProjection = projectList1.map(_.transform {
        case a if aliasMap.contains(a) => aliasMap(a)
      }).asInstanceOf[Seq[NamedExpression]]

      Project(substitutedProjection, child)

    // Eliminate no-op Projects
    case Project(projectList, child) if child.output == projectList => child
  }
}
           

分别舉三個例子來對應三種情況進行說明: 1、在聚合操作中,可以做列裁剪 給定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key") 優化前:

res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
 Project [key#51,value#52] //優化前預設select key 和 value兩列
  MetastoreRelation default, temp_shengli, None
           

優化後:

scala> ColumnPruning1(query.queryExecution.analyzed)
MetastoreRelation default, temp_shengli, None
res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
 Project [key#51]  //優化後,列裁剪掉了value,隻select key
  MetastoreRelation default, temp_shengli, None
           

2、 在join操作中,左右孩子可以做列裁剪

給定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b  on a.key =b.key ")

沒有優化之前:

scala> query.queryExecution.analyzed
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [value#42 AS qween#39]
 Join Inner, Some((key#41 = key#43))
  Project [key#41,value#42]  //這裡多select了一列,即value
   MetastoreRelation default, temp_shengli, None
  Project [key#43,value#44]  //這裡多select了一列,即value
   MetastoreRelation default, temp_shengli, None
           

優化後:(ColumnPruning2是我自己調試用的)

scala> ColumnPruning2(query.queryExecution.analyzed)
allReferences is -> Set(key#35, key#37)
MetastoreRelation default, temp_shengli, None
MetastoreRelation default, temp_shengli, None
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [key#35 AS qween#33]
 Join Inner, Some((key#35 = key#37))
  Project [key#35]   //經過列裁剪之後,left Child隻需要select key這一個列
   MetastoreRelation default, temp_shengli, None
  Project [key#37]   //經過列裁剪之後,right Child隻需要select key這一個列
   MetastoreRelation default, temp_shengli, None
           

3、 合并相鄰的Project的列,裁剪

給定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")  

優化前:

scala> query.queryExecution.analyzed
res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [(c#56 + 1) AS c0#57]
 Project [(1 + 1) AS c#56]
  MetastoreRelation default, temp_shengli, None
           

優化後:

scala> query.queryExecution.optimizedPlan
res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [(2 AS c#56 + 1) AS c0#57] //将子查詢裡的c 代入到 外層select裡的c,直接計算結果
 MetastoreRelation default, temp_shengli, None
           

三、總結:

  本文介紹了Optimizer在Catalyst裡的作用即将Analyzed Logical Plan 經過對Logical Plan和Expression進行Rule的應用transfrom,進而達到樹的節點進行合并和優化。其中主要的優化的政策總結起來是合并、列裁剪、過濾器下推幾大類。

  Catalyst應該在不斷疊代中,本文隻是基于spark1.0.0進行研究,後續如果新加入的優化政策也會在後續補充進來。

  歡迎大家讨論,共同進步!

——EOF——

原創文章,轉載請注明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文連結位址:http://blog.csdn.net/oopsoom/article/details/38121259

注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。

Spark SQL Catalyst源碼分析之Optimizer

繼續閱讀