天天看點

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 in-memory query

    前面講到了Spark SQL In-Memory Columnar Storage的存儲結構是基于列存儲的。

    那麼基于以上存儲結構,我們查詢cache在jvm内的資料又是如何查詢的,本文将揭示查詢In-Memory Data的方式。

一、引子

本例使用hive console裡查詢cache後的src表。 select value from src

當我們将src表cache到了記憶體後,再次查詢src,可以通過analyzed執行計劃來觀察内部調用。

即parse後,會形成InMemoryRelation結點,最後執行實體計劃時,會調用InMemoryColumnarTableScan這個結點的方法。

如下:

scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution = 
== Parsed Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Analyzed Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Optimized Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Physical Plan ==
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查詢記憶體中表的入口

Code Generation: false
== RDD ==
           

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst裡的一個葉子結點,包含了要查詢的attributes,和InMemoryRelation(封裝了我們緩存的In-Columnar Storage資料結構)。

執行葉子節點,出發execute方法對記憶體資料進行查詢。 1、查詢時,調用InMemoryRelation,對其封裝的記憶體資料結構的每個分區進行操作。 2、擷取要請求的attributes,如上,查詢請求的是src表的value屬性。 3、根據目的查詢表達式,來擷取在對應存儲結構中,請求列的index索引。 4、通過ColumnAccessor來對每個buffer進行通路,擷取對應查詢資料,并封裝為Row對象傳回。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 in-memory query
private[sql] case class InMemoryColumnarTableScan(
    attributes: Seq[Attribute],
    relation: InMemoryRelation)
  extends LeafNode {


  override def output: Seq[Attribute] = attributes


  override def execute() = {
    relation.cachedColumnBuffers.mapPartitions { iterator =>
      // Find the ordinals of the requested columns.  If none are requested, use the first.
      val requestedColumns = if (attributes.isEmpty) {
        Seq(0)
      } else {
        attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根據表達式exprId找出對應列的ByteBuffer的索引
      }


      iterator
        .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根據索引取得對應請求列的ByteBuffer,并封裝為ColumnAccessor。
        .flatMap { columnAccessors =>
          val nextRow = new GenericMutableRow(columnAccessors.length) //Row的長度
          new Iterator[Row] {
            override def next() = {
              var i = 0
              while (i < nextRow.length) {
                columnAccessors(i).extractTo(nextRow, i) //根據對應index和長度,從byterbuffer裡取得值,封裝到row裡
                i += 1
              }
              nextRow
            }


            override def hasNext = columnAccessors.head.hasNext
          }
        }
    }
  }
}
           

查詢請求的列,如下:

scala> exe.optimizedPlan
res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)


scala> val relation =  exe.optimizedPlan(1)
relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)


scala> val request_relation = exe.executedPlan
request_relation: org.apache.spark.sql.execution.SparkPlan = 
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))


scala> request_relation.output //請求的列,我們請求的隻有value列
res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)

scala> relation.output //預設儲存在relation中的所有列
res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)


scala> val attributes = request_relation.output 
attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)
           

整個流程很簡潔,關鍵步驟是第三步。根據ExprId來查找到,請求列的索引 attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

//根據exprId找出對應ID
scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
attr_index: Seq[Int] = ArrayBuffer(1) //找到請求的列value的索引是1, 我們查詢就從Index為1的bytebuffer中,請求資料

scala> relation.output.foreach(e=>println(e.exprId))
ExprId(4)    //對應<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
ExprId(5)

scala> request_relation.output.foreach(e=>println(e.exprId))
ExprId(5)
           

三、ColumnAccessor

ColumnAccessor對應每一種類型,類圖如下:

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 in-memory query

最後傳回一個新的疊代器:

new Iterator[Row] {
            override def next() = {
              var i = 0
              while (i < nextRow.length) { //請求列的長度
                columnAccessors(i).extractTo(nextRow, i)//調用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer
                i += 1
              }
              nextRow//傳回解析後的row
            }

            override def hasNext = columnAccessors.head.hasNext
          }
           

四、總結

    Spark SQL In-Memory Columnar Storage的查詢相對來說還是比較簡單的,其查詢思想主要和存儲的資料結構有關。

    即存儲時,按每列放到一個bytebuffer,形成一個bytebuffer數組。

    查詢時,根據請求列的exprId查找到上述數組的索引,然後使用ColumnAccessor對buffer中字段進行解析,最後封裝為Row對象,傳回。

——EOF——

創文章,轉載請注明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文連結位址:http://blog.csdn.net/oopsoom/article/details/39577419

注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 in-memory query

繼續閱讀