Spark SQL 可以将資料緩存到記憶體中,我們可以見到的通過調用cache table tableName即可将一張表緩存到記憶體中,來極大的提高查詢效率。
這就涉及到記憶體中的資料的存儲形式,我們知道基于關系型的資料可以存儲為基于行存儲結構 或 者基于列存儲結構,或者基于行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。
Spark SQL 的記憶體資料是如何組織的?
Spark SQL 将資料加載到記憶體是以列的存儲結構。稱為In-Memory Columnar Storage。
若直接存儲Java Object 會産生很大的記憶體開銷,并且這樣是基于Row的存儲結構。查詢某些列速度略慢,雖然資料以及載入記憶體,查詢效率還是低于面向列的存儲結構。
基于Row的Java Object存儲:
記憶體開銷大,且容易FULL GC,按列查詢比較慢。
基于Column的ByteBuffer存儲(Spark SQL):
記憶體開銷小,按列查詢速度較快。
Spark SQL的In-Memory Columnar Storage是位于spark列下面org.apache.spark.sql.columnar包内:
核心的類有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.
如果列有壓縮的情況:compression包下面有具體的build列和access列的類。
一、引子
當我們調用spark sql 裡的cache table command時,會生成一CacheCommand,這個Command是一個實體計劃。
scala> val cached = sql("cache table src")
cached: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
CacheCommand src, true
這裡列印出來tableName是src, 和一個是否要cache的boolean flag.
我們看下CacheCommand的構造:
CacheCommand支援2種操作,一種是把資料源加載帶記憶體中,一種是将資料源從記憶體中解除安裝。
對應于SQLContext下的cacheTable和uncacheTabele。
case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode with Command {
override protected[sql] lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName) //緩存表到記憶體
} else {
context.uncacheTable(tableName)//從記憶體中移除該表的資料
}
Seq.empty[Any]
}
override def execute(): RDD[Row] = {
sideEffectResult
context.emptyResult
}
override def output: Seq[Attribute] = Seq.empty
}
如果調用cached.collect(),則會根據Command指令來執行cache或者uncache操作,這裡我們執行cache操作。
cached.collect()将會調用SQLContext下的cacheTable函數:
首先通過catalog查詢關系,構造一個SchemaRDD。
/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
找到該Schema的analyzed計劃。比對構造InMemoryRelation:
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = table(tableName).queryExecution.analyzed //構造schemaRDD并将其執行analyze計劃操作
val asInMemoryRelation = currentTable match {
case _: InMemoryRelation => //如果已經是InMemoryRelation,則傳回
currentTable.logicalPlan
case _ => //如果不是(預設剛剛cache的時候是空的)則建構一個記憶體關系InMemoryRelation
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
}
//将建構好的InMemoryRelation注冊到catalog裡。
catalog.registerTable(None, tableName, asInMemoryRelation)
}
二、InMemoryRelation
InMemoryRelation繼承自LogicalPlan,是Spark1.1 Spark SQL裡新添加的一種TreeNode,也是catalyst裡的一種plan. 現在TreeNode變成了4種:
1、BinaryNode 二進制節點
2、LeafNode 葉子節點
3、UnaryNode 單孩子節點
4、InMemoryRelation 記憶體關系型節點
類圖如下:
值得注意的是,_cachedColumnBuffers這個類型為RDD[Array[ByteBuffer]]的私有字段。
這個封裝就是面向列的存儲ByteBuffer。前面提到相較于plain java object存儲記錄,用ByteBuffer能顯著的提高存儲效率,減少記憶體占用。并且按列查詢的速度會非常快。
InMemoryRelation具體實作如下:
構造一個InMemoryRelation需要該Relation的output Attributes,是否需要useCoompression來壓縮,預設為false,一次處理的多少行資料batchSize, child 即SparkPlan。
private[sql] case class InMemoryRelation(
output: Seq[Attribute], //輸出屬性,比如src表裡就是[key,value]
useCompression: Boolean, //操作時是否使用壓縮,預設false
batchSize: Int, //批的大小量
child: SparkPlan) //spark plan 具體child
可以通過設定:
spark.sql.inMemoryColumnarStorage.compressed 為true來設定記憶體中的列存儲是否需要壓縮。
spark.sql.inMemoryColumnarStorage.batchSize 來設定一次處理多少row
spark.sql.defaultSizeInBytes 來設定初始化的column的bufferbytes的預設大小,這裡隻是其中一個參數。
這些參數都可以在源碼中設定,都在SQL Conf
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
再回到case class InMemoryRelation:
_cachedColumnBuffers就是我們最終将table放入記憶體的存儲句柄,是一個RDD[Array[ByteBuffer]。
緩存主流程:
1、判斷_cachedColumnBuffers是否為null,如果不是null,則已經Cache了目前table,重複cache不會觸發cache操作。
2、child是SparkPlan,即執行hive table scan,測試我拿sbt/sbt hive/console裡test裡的src table為例,操作是掃描這張表。這個表有2個字的key是int, value 是string
3、拿到child的output, 這裡的output就是 key, value2個列。
4、執行mapPartitions操作,對目前RDD的每個分區的資料進行操作。
5、對于每一個分區,疊代裡面的資料生成新的Iterator。每個Iterator裡面是Array[ByteBuffer]
6、對于child.output的每一列,都會生成一個ColumnBuilder,最後組合為一個columnBuilders是一個數組。
7、數組内每個CommandBuilder持有一個ByteBuffer
8、周遊原始分區的記錄,将對于的行轉為列,并将資料存到ByteBuffer内。
9、最後将此RDD調用cache方法,将RDD緩存。
10、将cached賦給_cachedColumnBuffers。
此操作總結下來是:執行hive table scan操作,傳回的MapPartitionsRDD對其重新定義mapPartition方法,将其行轉列,并且最終cache到記憶體中。
所有流程如下:
// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) { //判斷是否已經cache了目前table
val output = child.output
/**
* child.output
res65: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#6, value#7)
*/
val cached = child.execute().mapPartitions { baseIterator =>
/**
* child.execute()是Row的集合,疊代Row
* res66: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
*
* val row1 = child.execute().take(1)
* res67: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
* */
/*
* 對每個Partition進行map,映射生成一個Iterator[Array[ByteBuffer],對應java的Iterator<List<ByteBuffer>>
* */
new Iterator[Array[ByteBuffer]] {
def next() = {
//周遊每一列,首先attribute是key 為 IntegerType ,然後attribute是value是String
//最後封裝成一個Array, index 0 是 IntColumnBuilder, 1 是StringColumnBuilder
val columnBuilders = output.map { attribute =>
val columnType = ColumnType(attribute.dataType)
val initialBufferSize = columnType.defaultSize * batchSize
ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
}.toArray
//src表裡Row是[238,val_238] 這行Row的length就是2
var row: Row = null
var rowCount = 0
//batchSize預設1000
while (baseIterator.hasNext && rowCount < batchSize) {
//周遊每一條記錄
row = baseIterator.next()
var i = 0
//這裡row length是2,i的取值是0 和 1
while (i < row.length) {
//擷取columnBuilders, 0是IntColumnBuilder,
//BasicColumnBuilder的appendFrom
//Appends `row(ordinal)` to the column builder.
columnBuilders(i).appendFrom(row, i)
i += 1
}
//該行已經插入完畢
rowCount += 1
}
//limit and rewind,Returns the final columnar byte buffer.
columnBuilders.map(_.build())
}
def hasNext = baseIterator.hasNext
}
}.cache()
cached.setName(child.toString)
_cachedColumnBuffers = cached
}
三、Columnar Storage
初始化ColumnBuilders:
val columnBuilders = output.map { attribute =>
val columnType = ColumnType(attribute.dataType)
val initialBufferSize = columnType.defaultSize * batchSize
ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
}.toArray
這裡會聲明一個數組,來對應每一列的存儲,如下圖:
然後初始化類型builder的時候會傳入的參數:
initialBufferSize:文章開頭的圖中會有ByteBuffer,ByteBuffer的初始化大小是如何計算的?
initialBufferSize = 列類型預設長度 × batchSize ,預設batchSize是1000
拿Int類型舉例,initialBufferSize of IntegerType = 4 * 1000
attribute.name即字段名age,name etc。。。
ColumnType:
ColumnType封裝了 該類型的 typeId 和 該類型的 defaultSize。并且提供了extract、append\getField方法,來向buffer裡追加和擷取資料。
如IntegerType typeId 為0, defaultSize 4 ......
詳細看下類圖,畫的不是非常嚴格的類圖,主要為了展示目前類型系統:
ColumnBuilder:
ColumnBuilder的主要職責是:管理ByteBuffer,包括初始化buffer,添加資料到buffer内,檢查剩餘空間,和申請新的空間這幾項主要職責。
initialize負責初始化buffer。
appendFrom是負責添加資料。
ensureFreeSpace確定buffer的長度動态增加。
類圖如下:
ByteBuffer的初始化過程:
初始化大小initialSize:拿Int舉例,在前面builder初始化傳入的是4×batchSize=4*1000,initialSize也就是4KB,如果沒有傳入initialSize,則預設是1024×1024。
列名稱,是否需要壓縮,都是需要傳入的。
ByteBuffer聲明時預留了4個位元組,為了放column type id,這個在ColumnType的構造裡有介紹過。
override def initialize(
initialSize: Int,
columnName: String = "",
useCompression: Boolean = false) = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize //如果沒有預設1024×1024 byte
this.columnName = columnName
// Reserves 4 bytes for column type ID
buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) // buffer的初始化長度,需要加上4byte類型ID空間。
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根據nativeOrder排序,然後首先放入typeId
}
存儲的方式如下:
Int的type id 是0, string的 type id 是 7. 後面就是實際存儲的資料了。
ByteBuffer寫入過程:
存儲結構都介紹完畢,最後開始對Table進行scan了,scan後對每一個分區的每個Row進行操作周遊:
1、讀每個分區的每條Row
2、擷取每個列的值,從builders數組裡找到索引 i 對應的bytebuffer,追加至bytebuffer。
while (baseIterator.hasNext && rowCount < batchSize) {
//周遊每一條記錄
row = baseIterator.next()
var i = 0
//這裡row length是2,i的取值是0 和 1 Ps:還是拿src table做測試,每一個Row隻有2個字段,key, value所有長度為2
while (i < row.length) {
//擷取columnBuilders, 0是IntColumnBuilder,
//BasicColumnBuilder的appendFrom
//Appends `row(ordinal)` to the column builder.
columnBuilders(i).appendFrom(row, i) //追加到對應的bytebuffer
i += 1
}
//該行已經插入完畢
rowCount += 1
}
//limit and rewind,Returns the final columnar byte buffer.
columnBuilders.map(_.build())
追加過程:
根據目前builder的類型,從row的對應索引中取出值,最後追加到builder的bytebuffer内。
override def appendFrom(row: Row, ordinal: Int) {
//ordinal是Row的index,0就是第一列值,1就是第二列值,擷取列的值為field
//最後在将該列的值put到該buffer内
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))//動态擴容
columnType.append(field, buffer)
}
ensureFreeSpace:
主要是操作buffer,如果要追加的資料大于剩餘空間,就擴大buffer。
//確定剩餘空間能容下,如果剩餘空間小于 要放入的大小,則重新配置設定一看記憶體空間
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) { //目前buffer剩餘空間比要追加的資料大,則什麼都不做,傳回自身
orig
} else { //否則擴容
// grow in steps of initial size
val capacity = orig.capacity()
val newSize = capacity + size.max(capacity / 8 + 1)
val pos = orig.position()
orig.clear()
ByteBuffer
.allocate(newSize)
.order(ByteOrder.nativeOrder())
.put(orig.array(), 0, pos)
}
}
......
最後調用MapPartitionsRDD.cache(),将該RDD緩存并添加到spark cache管理中。
至此,我們将一張spark sql table緩存到了spark的jvm中。
四、總結
對于資料的存儲結構,我們常常關注持久化的存儲結構,并且在長久時間内有了很多種高效結構。
但是在實時性的要求下,記憶體資料庫越來越被關注,如何優化記憶體資料庫的存儲結構,是一個重點,也是一個難點。
對于Spark SQL 和 Shark 裡的列存儲 是一種優化方案,提高了關系查詢中列查詢的速度,和減少了記憶體占用。但是中存儲方式還是比較簡單的,沒有額外的中繼資料和索引來提高查詢效率,希望以後能了解到更多的In-Memory Storage。
——EOF——
創文章,轉載請注明:
轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文連結位址:http://blog.csdn.net/oopsoom/article/details/39525483
注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。