天天看點

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

    Spark SQL 可以将資料緩存到記憶體中,我們可以見到的通過調用cache table tableName即可将一張表緩存到記憶體中,來極大的提高查詢效率。

    這就涉及到記憶體中的資料的存儲形式,我們知道基于關系型的資料可以存儲為基于行存儲結構 或 者基于列存儲結構,或者基于行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。

    Spark SQL 的記憶體資料是如何組織的?

    Spark SQL 将資料加載到記憶體是以列的存儲結構。稱為In-Memory Columnar Storage。

    若直接存儲Java Object 會産生很大的記憶體開銷,并且這樣是基于Row的存儲結構。查詢某些列速度略慢,雖然資料以及載入記憶體,查詢效率還是低于面向列的存儲結構。

基于Row的Java Object存儲:

記憶體開銷大,且容易FULL GC,按列查詢比較慢。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

基于Column的ByteBuffer存儲(Spark SQL):

記憶體開銷小,按列查詢速度較快。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

    Spark SQL的In-Memory Columnar Storage是位于spark列下面org.apache.spark.sql.columnar包内:

    核心的類有 ColumnBuilder,  InMemoryColumnarTableScan, ColumnAccessor, ColumnType.

    如果列有壓縮的情況:compression包下面有具體的build列和access列的類。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

一、引子

    當我們調用spark sql 裡的cache table command時,會生成一CacheCommand,這個Command是一個實體計劃。

scala> val cached = sql("cache table src")
           
cached: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
CacheCommand src, true
           

這裡列印出來tableName是src, 和一個是否要cache的boolean flag.

我們看下CacheCommand的構造:

CacheCommand支援2種操作,一種是把資料源加載帶記憶體中,一種是将資料源從記憶體中解除安裝。

對應于SQLContext下的cacheTable和uncacheTabele。 

case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
  extends LeafNode with Command {

  override protected[sql] lazy val sideEffectResult = {
    if (doCache) {
      context.cacheTable(tableName) //緩存表到記憶體
    } else {
      context.uncacheTable(tableName)//從記憶體中移除該表的資料
    }
    Seq.empty[Any]
  }
  override def execute(): RDD[Row] = {
    sideEffectResult
    context.emptyResult
  }
  override def output: Seq[Attribute] = Seq.empty
}
           

如果調用cached.collect(),則會根據Command指令來執行cache或者uncache操作,這裡我們執行cache操作。

cached.collect()将會調用SQLContext下的cacheTable函數:

首先通過catalog查詢關系,構造一個SchemaRDD。

/** Returns the specified table as a SchemaRDD */
  def table(tableName: String): SchemaRDD =
    new SchemaRDD(this, catalog.lookupRelation(None, tableName))
           

找到該Schema的analyzed計劃。比對構造InMemoryRelation:

/** Caches the specified table in-memory. */
  def cacheTable(tableName: String): Unit = {
    val currentTable = table(tableName).queryExecution.analyzed //構造schemaRDD并将其執行analyze計劃操作
    val asInMemoryRelation = currentTable match {
      case _: InMemoryRelation => //如果已經是InMemoryRelation,則傳回
        currentTable.logicalPlan

      case _ => //如果不是(預設剛剛cache的時候是空的)則建構一個記憶體關系InMemoryRelation
        InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
    }
    //将建構好的InMemoryRelation注冊到catalog裡。
    catalog.registerTable(None, tableName, asInMemoryRelation)
  }
           

二、InMemoryRelation

 InMemoryRelation繼承自LogicalPlan,是Spark1.1 Spark SQL裡新添加的一種TreeNode,也是catalyst裡的一種plan. 現在TreeNode變成了4種:

1、BinaryNode 二進制節點

2、LeafNode 葉子節點

3、UnaryNode 單孩子節點

4、InMemoryRelation 記憶體關系型節點

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

類圖如下:

值得注意的是,_cachedColumnBuffers這個類型為RDD[Array[ByteBuffer]]的私有字段。

這個封裝就是面向列的存儲ByteBuffer。前面提到相較于plain java object存儲記錄,用ByteBuffer能顯著的提高存儲效率,減少記憶體占用。并且按列查詢的速度會非常快。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

InMemoryRelation具體實作如下:

構造一個InMemoryRelation需要該Relation的output Attributes,是否需要useCoompression來壓縮,預設為false,一次處理的多少行資料batchSize, child 即SparkPlan。

private[sql] case class InMemoryRelation(
    output: Seq[Attribute], //輸出屬性,比如src表裡就是[key,value]
    useCompression: Boolean, //操作時是否使用壓縮,預設false
    batchSize: Int, //批的大小量
    child: SparkPlan) //spark plan 具體child
           

可以通過設定:

spark.sql.inMemoryColumnarStorage.compressed 為true來設定記憶體中的列存儲是否需要壓縮。

spark.sql.inMemoryColumnarStorage.batchSize 來設定一次處理多少row

spark.sql.defaultSizeInBytes 來設定初始化的column的bufferbytes的預設大小,這裡隻是其中一個參數。

這些參數都可以在源碼中設定,都在SQL Conf

private[spark] object SQLConf {
  val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
  val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize" 
  val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
           

 再回到case class InMemoryRelation:

_cachedColumnBuffers就是我們最終将table放入記憶體的存儲句柄,是一個RDD[Array[ByteBuffer]。

緩存主流程:

1、判斷_cachedColumnBuffers是否為null,如果不是null,則已經Cache了目前table,重複cache不會觸發cache操作。

2、child是SparkPlan,即執行hive table scan,測試我拿sbt/sbt hive/console裡test裡的src table為例,操作是掃描這張表。這個表有2個字的key是int, value 是string

3、拿到child的output, 這裡的output就是 key, value2個列。

4、執行mapPartitions操作,對目前RDD的每個分區的資料進行操作。

5、對于每一個分區,疊代裡面的資料生成新的Iterator。每個Iterator裡面是Array[ByteBuffer]

6、對于child.output的每一列,都會生成一個ColumnBuilder,最後組合為一個columnBuilders是一個數組。

7、數組内每個CommandBuilder持有一個ByteBuffer

8、周遊原始分區的記錄,将對于的行轉為列,并将資料存到ByteBuffer内。

9、最後将此RDD調用cache方法,将RDD緩存。

10、将cached賦給_cachedColumnBuffers。

此操作總結下來是:執行hive table scan操作,傳回的MapPartitionsRDD對其重新定義mapPartition方法,将其行轉列,并且最終cache到記憶體中。

所有流程如下:

// If the cached column buffers were not passed in, we calculate them in the constructor.
  // As in Spark, the actual work of caching is lazy.
  if (_cachedColumnBuffers == null) { //判斷是否已經cache了目前table
    val output = child.output
      /**
           * child.output
          res65: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#6, value#7)
           */
    val cached = child.execute().mapPartitions { baseIterator =>
      /**
       * child.execute()是Row的集合,疊代Row
       * res66: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
       * 
       * val row1 = child.execute().take(1)
       * res67: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
       * */
      /*
       * 對每個Partition進行map,映射生成一個Iterator[Array[ByteBuffer],對應java的Iterator<List<ByteBuffer>>
       * */
      new Iterator[Array[ByteBuffer]] {
        def next() = {
          //周遊每一列,首先attribute是key 為 IntegerType ,然後attribute是value是String
          //最後封裝成一個Array, index 0 是 IntColumnBuilder, 1 是StringColumnBuilder
          val columnBuilders = output.map { attribute =>
            val columnType = ColumnType(attribute.dataType)
            val initialBufferSize = columnType.defaultSize * batchSize
            ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
          }.toArray
          //src表裡Row是[238,val_238] 這行Row的length就是2
          var row: Row = null
          var rowCount = 0
          //batchSize預設1000
          while (baseIterator.hasNext && rowCount < batchSize) {
            //周遊每一條記錄
            row = baseIterator.next()
            var i = 0
            //這裡row length是2,i的取值是0 和 1
            while (i < row.length) {
              //擷取columnBuilders, 0是IntColumnBuilder, 
              //BasicColumnBuilder的appendFrom
              //Appends `row(ordinal)` to the column builder.
              columnBuilders(i).appendFrom(row, i)
              i += 1
            }
            //該行已經插入完畢
            rowCount += 1
          }
          //limit and rewind,Returns the final columnar byte buffer.
          columnBuilders.map(_.build())
        }

        def hasNext = baseIterator.hasNext
      }
    }.cache()

    cached.setName(child.toString)
    _cachedColumnBuffers = cached
  }
           

三、Columnar Storage

初始化ColumnBuilders:

val columnBuilders = output.map { attribute =>
                val columnType = ColumnType(attribute.dataType)
                val initialBufferSize = columnType.defaultSize * batchSize
                ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
              }.toArray
           

這裡會聲明一個數組,來對應每一列的存儲,如下圖:

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

然後初始化類型builder的時候會傳入的參數:

initialBufferSize:文章開頭的圖中會有ByteBuffer,ByteBuffer的初始化大小是如何計算的?

initialBufferSize = 列類型預設長度 × batchSize ,預設batchSize是1000

拿Int類型舉例,initialBufferSize of IntegerType = 4 * 1000 

attribute.name即字段名age,name etc。。。

ColumnType:

ColumnType封裝了 該類型的 typeId  和  該類型的 defaultSize。并且提供了extract、append\getField方法,來向buffer裡追加和擷取資料。

如IntegerType  typeId 為0, defaultSize 4 ......

詳細看下類圖,畫的不是非常嚴格的類圖,主要為了展示目前類型系統:

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

ColumnBuilder:

ColumnBuilder的主要職責是:管理ByteBuffer,包括初始化buffer,添加資料到buffer内,檢查剩餘空間,和申請新的空間這幾項主要職責。

initialize負責初始化buffer。

appendFrom是負責添加資料。

ensureFreeSpace確定buffer的長度動态增加。

類圖如下:

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

ByteBuffer的初始化過程:

初始化大小initialSize:拿Int舉例,在前面builder初始化傳入的是4×batchSize=4*1000,initialSize也就是4KB,如果沒有傳入initialSize,則預設是1024×1024。

列名稱,是否需要壓縮,都是需要傳入的。

ByteBuffer聲明時預留了4個位元組,為了放column type id,這個在ColumnType的構造裡有介紹過。

override def initialize(
      initialSize: Int,
      columnName: String = "",
      useCompression: Boolean = false) = {

    val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize //如果沒有預設1024×1024 byte
    this.columnName = columnName

    // Reserves 4 bytes for column type ID
    buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) // buffer的初始化長度,需要加上4byte類型ID空間。
    buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根據nativeOrder排序,然後首先放入typeId
  }
           

存儲的方式如下:

Int的type id 是0, string的 type id 是 7. 後面就是實際存儲的資料了。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

ByteBuffer寫入過程:

存儲結構都介紹完畢,最後開始對Table進行scan了,scan後對每一個分區的每個Row進行操作周遊:

1、讀每個分區的每條Row

2、擷取每個列的值,從builders數組裡找到索引 i 對應的bytebuffer,追加至bytebuffer。

while (baseIterator.hasNext && rowCount < batchSize) {
	            //周遊每一條記錄
	            row = baseIterator.next()
	            var i = 0
	            //這裡row length是2,i的取值是0 和 1 Ps:還是拿src table做測試,每一個Row隻有2個字段,key, value所有長度為2
	            while (i < row.length) {
	              //擷取columnBuilders, 0是IntColumnBuilder, 
	              //BasicColumnBuilder的appendFrom
	              //Appends `row(ordinal)` to the column builder.
	              columnBuilders(i).appendFrom(row, i) //追加到對應的bytebuffer
	              i += 1
	            }
	            //該行已經插入完畢
	            rowCount += 1
	          }
	          //limit and rewind,Returns the final columnar byte buffer.
	          columnBuilders.map(_.build())
           

追加過程:

根據目前builder的類型,從row的對應索引中取出值,最後追加到builder的bytebuffer内。

override def appendFrom(row: Row, ordinal: Int) {
    //ordinal是Row的index,0就是第一列值,1就是第二列值,擷取列的值為field
    //最後在将該列的值put到該buffer内
    val field = columnType.getField(row, ordinal)
    buffer = ensureFreeSpace(buffer, columnType.actualSize(field))//動态擴容
    columnType.append(field, buffer)
  }
           

ensureFreeSpace:

主要是操作buffer,如果要追加的資料大于剩餘空間,就擴大buffer。

//確定剩餘空間能容下,如果剩餘空間小于 要放入的大小,則重新配置設定一看記憶體空間
  private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
    if (orig.remaining >= size) { //目前buffer剩餘空間比要追加的資料大,則什麼都不做,傳回自身
      orig
    } else { //否則擴容
      // grow in steps of initial size
      val capacity = orig.capacity()
      val newSize = capacity + size.max(capacity / 8 + 1)
      val pos = orig.position()

      orig.clear()
      ByteBuffer
        .allocate(newSize)
        .order(ByteOrder.nativeOrder())
        .put(orig.array(), 0, pos)
    }
  }
           

......

最後調用MapPartitionsRDD.cache(),将該RDD緩存并添加到spark cache管理中。

至此,我們将一張spark sql table緩存到了spark的jvm中。

四、總結

    對于資料的存儲結構,我們常常關注持久化的存儲結構,并且在長久時間内有了很多種高效結構。

    但是在實時性的要求下,記憶體資料庫越來越被關注,如何優化記憶體資料庫的存儲結構,是一個重點,也是一個難點。

    對于Spark SQL 和 Shark 裡的列存儲 是一種優化方案,提高了關系查詢中列查詢的速度,和減少了記憶體占用。但是中存儲方式還是比較簡單的,沒有額外的中繼資料和索引來提高查詢效率,希望以後能了解到更多的In-Memory Storage。

——EOF——

創文章,轉載請注明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文連結位址:http://blog.csdn.net/oopsoom/article/details/39525483

注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。

Spark SQL 源碼分析之 In-Memory Columnar Storage 之 cache table

繼續閱讀