天天看點

Spark SQL Catalyst源碼分析之UDF

在SQL的世界裡,除了官方提供的常用的處理函數之外,一般都會提供可擴充的對外自定義函數接口,這已經成為一種事實的标準。

  在前面Spark SQL源碼分析之核心流程一文中,已經介紹了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions這個解析函數的功能。但是随着Spark1.1版本的釋出,Spark SQL的代碼有很多新完善和新功能了,和我先前基于1.0的源碼分析多少有些不同,比如支援UDF:

  spark1.0及以前的實作:

protected[sql] lazy val catalog: Catalog = new SimpleCatalog
  @transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空實作
  @transient
  protected[sql] val optimizer = Optimizer
           

  Spark1.1及以後的實作:

protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry實作,支援簡單的UDF

  @transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, functionRegistry, caseSensitive = true)
           

一、引子:

  對于SQL語句中的函數,會經過SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最後會被Analyzer解析。

 SqlParser:

 除了非官方定義的函數外,還可以定義自定義函數,sql parser會進行解析。

ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
      case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
           

  将SqlParser傳入的udfName和exprs封裝成一個class class UnresolvedFunction繼承自Expression。

  隻是這個Expression的dataType等一系列屬性和eval計算方法均無法通路,強制通路會抛出異常,因為它沒有被Resolved,隻是一個載體。

case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
  override def dataType = throw new UnresolvedException(this, "dataType")
  override def foldable = throw new UnresolvedException(this, "foldable")
  override def nullable = throw new UnresolvedException(this, "nullable")
  override lazy val resolved = false

  // Unresolved functions are transient at compile time and don't get evaluated during execution.
  override def eval(input: Row = null): EvaluatedType =
    throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

  override def toString = s"'$name(${children.mkString(",")})"
}<strong></strong>
           

Analyzer:

  Analyzer初始化的時候會需要Catalog,database和table的中繼資料關系,以及FunctionRegistry來維護UDF名稱和UDF實作的中繼資料,這裡使用SimpleFunctionRegistry。

/**
   * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]].
   */
  object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan =>
        q transformExpressions { //對目前LogicalPlan進行transformExpressions操作
          case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果周遊到了UnresolvedFunction
            registry.lookupFunction(name, children) //從UDF中繼資料表裡查找udf函數
        }
    }
  }
           

二、UDF注冊

2.1 UDFRegistration

Spark SQL Catalyst源碼分析之UDF

  registerFunction("len", (x:String)=>x.length)

  registerFunction是UDFRegistration下的方法,SQLContext現在實作了UDFRegistration這個trait,隻要導入SQLContext,即可以使用udf功能。

  UDFRegistration核心方法registerFunction:

  registerFunction方法簽名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit

  接受一個udfName 和 一個FunctionN,可以是Function1 到Function22。即這個udf的參數隻支援1-22個。(scala的痛啊)

  内部builder通過ScalaUdf來構造一個Expression,這裡ScalaUdf繼承自Expression(可以簡單的了解目前的SimpleUDF即是一個Catalyst的一個Expression),傳入scala的function作為UDF的實作,并且用反射檢查字段類型是否是Catalyst允許的,見ScalaReflection.

def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//構造Expression
    functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(維護了一個hashMap來管理udf映射)注冊
  }
           

2.2 注冊Function:

注意:這裡FunctionBuilder是一個type FunctionBuilder = Seq[Expression] => Expression

class SimpleFunctionRegistry extends FunctionRegistry {
  val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射關系維護[udfName,Expression]

  def registerFunction(name: String, builder: FunctionBuilder) = { //put expression進Map
    functionBuilders.put(name, builder)
  }

  override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
    functionBuilders(name)(children) //查找udf,傳回Expression
  }
}
           

至此,我們将一個scala function注冊為一個catalyst的一個Expression,這就是spark的simple udf。

三、UDF計算:

UDF既然已經被封裝為catalyst樹裡的一個Expression節點,那麼計算的時候也就是計算ScalaUdf的eval方法。

先通過Row和表達式計算function所需要的參數,最後通過反射調用function,來達到計算udf的目的。

 ScalaUdf繼承自Expression:

Spark SQL Catalyst源碼分析之UDF

scalaUdf接受一個function, dataType,和一系清單達式。

比較簡單,看注釋即可:

case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
  extends Expression {

  type EvaluatedType = Any

  def nullable = true

  override def toString = s"scalaUDF(${children.mkString(",")})"
 override def eval(input: Row): Any = {
    val result = children.size match {
      case 0 => function.asInstanceOf[() => Any]()
      case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射調用function
      case 2 =>
        function.asInstanceOf[(Any, Any) => Any](
          children(0).eval(input), //表達式參數計算
          children(1).eval(input))
      case 3 =>
        function.asInstanceOf[(Any, Any, Any) => Any](
          children(0).eval(input),
          children(1).eval(input),
          children(2).eval(input))
      case 4 =>
     ......
       case 22 => //scala function隻支援22個參數,這裡枚舉了。
        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
          children(0).eval(input),
          children(1).eval(input),
          children(2).eval(input),
          children(3).eval(input),
          children(4).eval(input),
          children(5).eval(input),
          children(6).eval(input),
          children(7).eval(input),
          children(8).eval(input),
          children(9).eval(input),
          children(10).eval(input),
          children(11).eval(input),
          children(12).eval(input),
          children(13).eval(input),
          children(14).eval(input),
          children(15).eval(input),
          children(16).eval(input),
          children(17).eval(input),
          children(18).eval(input),
          children(19).eval(input),
          children(20).eval(input),
          children(21).eval(input))
           

四、總結

    Spark目前的UDF其實就是scala function。将scala function封裝到一個Catalyst Expression當中,在進行sql計算時,使用同樣的Eval方法對目前輸入Row進行計算。

    編寫一個spark udf非常簡單,隻需給UDF起個函數名,并且傳遞一個scala function即可。依靠scala函數程式設計的表現能力,使得編寫scala udf比較簡單,且相較hive的udf更容易使人了解。

——EOF——

原創文章,轉載請注明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文連結位址:http://blog.csdn.net/oopsoom/article/details/39395641

注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。

Spark SQL Catalyst源碼分析之UDF

繼續閱讀