接上一篇文章Spark SQL Catalyst源碼分析之Physical Plan,本文将介紹Physical Plan的toRDD的具體實作細節:
我們都知道一段sql,真正的執行是當你調用它的collect()方法才會執行Spark Job,最後計算得到RDD。
lazy val toRdd: RDD[Row] = executedPlan.execute()
Spark Plan基本包含4種操作類型,即BasicOperator基本類型,還有就是Join、Aggregate和Sort這種稍複雜的。
如圖:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2QvwVe0lmdhJ3ZvwFM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2Lc1TPRJmd5IzY3ljMiZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39jN4EDM1kTMxEDMzcDM0EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
一、BasicOperator
1.1、Project
Project 的大緻含義是:傳入一系清單達式Seq[NamedExpression],給定輸入的Row,經過Convert(Expression的計算eval)操作,生成一個新的Row。 Project的實作是調用其child.execute()方法,然後調用mapPartitions對每一個Partition進行操作。
這個f函數其實是new了一個MutableProjection,然後循環的對每個partition進行Convert。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)
override def execute() = child.execute().mapPartitions { iter => //對每個分區進行f映射
@transient val reusableProjection = new MutableProjection(projectList)
iter.map(reusableProjection)
}
}
通過觀察MutableProjection的定義,可以發現,就是bind references to a schema 和 eval的過程: 将一個Row轉換為另一個已經定義好schema column的Row。
如果輸入的Row已經有Schema了,則傳入的Seq[Expression]也會bound到目前的Schema。
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema
private[this] val exprArray = expressions.toArray
private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row
def currentValue: Row = mutableRow
def apply(input: Row): Row = {
var i = 0
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).eval(input) //根據輸入的input,即一個Row,計算生成的Row
i += 1
}
mutableRow //傳回新的Row
}
}
1.2、Filter
Filter的具體實作是傳入的condition進行對input row的eval計算,最後傳回的是一個Boolean類型, 如果表達式計算成功,傳回true,則這個分區的這條資料就會儲存下來,否則會過濾掉。
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output = child.output
override def execute() = child.execute().mapPartitions { iter =>
iter.filter(condition.eval(_).asInstanceOf[Boolean]) //計算表達式 eval(input row)
}
}
1.3、Sample
Sample取樣操作其實是調用了child.execute()的結果後,傳回的是一個RDD,對這個RDD調用其sample函數,原生方法。
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
extends UnaryNode
{
override def output = child.output
// TODO: How to pick seed?
override def execute() = child.execute().sample(withReplacement, fraction, seed)
}
1.4、Union
Union操作支援多個子查詢的Union,是以傳入的child是一個Seq[SparkPlan] execute()方法的實作是對其所有的children,每一個進行execute(),即select查詢的結果集合RDD。 通過調用SparkContext的union方法,将所有子查詢的結果合并起來。
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output = children.head.output
override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查詢的結果進行union
override def otherCopyArgs = sqlContext :: Nil
}
1.5、Limit
Limit操作在RDD的原生API裡也有,即take(). 但是Limit的實作分2種情況: 第一種是 limit作為結尾的操作符,即select xxx from yyy limit zzz。 并且是被executeCollect調用,則直接在driver裡使用take方法。 第二種是 limit不是作為結尾的操作符,即limit後面還有查詢,那麼就在每個分區調用limit,最後repartition到一個分區來計算global limit.
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again
override def otherCopyArgs = sqlContext :: Nil
override def output = child.output
override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver調用take
override def execute() = {
val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Boolean, Row]()
iter.take(limit).map(row => mutablePair.update(false, row)) //每個分區先計算limit
}
val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,來repartition
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
shuffled.mapPartitions(_.take(limit).map(_._2)) //最後單獨一個partition來take limit
}
}
1.6、TakeOrdered
TakeOrdered是經過排序後的limit N,一般是用在sort by 操作符後的limit。 可以簡單了解為TopN操作符。
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sqlContext: SQLContext) extends UnaryNode {
override def otherCopyArgs = sqlContext :: Nil
override def output = child.output
@transient
lazy val ordering = new RowOrdering(sortOrder) //這裡是通過RowOrdering來實作排序的
override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}
1.7、Sort
Sort也是通過RowOrdering這個類來實作排序的,child.execute()對每個分區進行map,每個分區根據RowOrdering的order來進行排序,生成一個新的有序集合。 也是通過調用Spark RDD的sorted方法來實作的。
case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
@transient
lazy val ordering = new RowOrdering(sortOrder) //排序順序
override def execute() = attachTree(this, "sort") {
// TODO: Optimize sorting operation?
child.execute()
.mapPartitions(
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每個分區調用sorted方法,傳入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序規則,進行排序</span>
preservesPartitioning = true)
}
override def output = child.output
}
1.8、ExistingRdd
ExistingRdd是
object ExistingRdd {
def convertToCatalyst(a: Any): Any = a match {
case o: Option[_] => o.orNull
case s: Seq[Any] => s.map(convertToCatalyst)
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
case other => other
}
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = convertToCatalyst(r.productElement(i))
i += 1
}
mutableRow
}
}
}
}
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
}
}
二、 Join Related Operators
HashJoin:
在講解Join Related Operator之前,有必要了解一下HashJoin這個位于execution包下的joins.scala檔案裡的trait。 Join操作主要包含 BroadcastHashJoin、 LeftSemiJoinHash、 ShuffledHashJoin均實作了HashJoin這個trait. 主要類圖如下:
HashJoin這個trait的主要成員有: buildSide是左連接配接還是右連接配接,有一種基準的意思。 leftKeys是左孩子的expressions, rightKeys是右孩子的expressions。 left是左孩子實體計劃,right是右孩子實體計劃。 buildSideKeyGenerator是一個Projection是根據傳入的Row對象來計算buildSide的Expression的。 streamSideKeyGenerator是一個MutableProjection是根據傳入的Row對象來計算streamSide的Expression的。 這裡buildSide如果是left的話,可以了解為buildSide是左表,那麼去連接配接這個左表的右表就是streamSide。
HashJoin關鍵的操作是joinIterators,簡單來說就是join兩個表,把每個表看着Iterators[Row]. 方式: 1、首先周遊buildSide,計算buildKeys然後利用一個HashMap,形成 (buildKeys, Iterators[Row])的格式。 2、周遊StreamedSide,計算streamedKey,去HashMap裡面去比對key,來進行join 3、最後生成一個joinRow,這個将2個row對接。 見代碼注釋:
trait HashJoin {
val leftKeys: Seq[Expression]
val rightKeys: Seq[Expression]
val buildSide: BuildSide
val left: SparkPlan
val right: SparkPlan
lazy val (buildPlan, streamedPlan) = buildSide match { //模式比對,将physical plan封裝形成Tuple2,如果是buildLeft,那麼就是(left,right),否則是(right,left)
case BuildLeft => (left, right)
case BuildRight => (right, left)
}
lazy val (buildKeys, streamedKeys) = buildSide match { //模式比對,将expression進行封裝<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span>
case BuildLeft => (leftKeys, rightKeys)
case BuildRight => (rightKeys, leftKeys)
}
def output = left.output ++ right.output
@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //生成buildSideKey來根據Expression來計算Row傳回結果
@transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">生成</span><span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span><span style="font-family: Arial, Helvetica, sans-serif;">來根據Expression來計算Row傳回結果</span>
() => new MutableProjection(streamedKeys, streamedPlan.output)
def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //把build表的Iterator[Row]和streamIterator[Row]進行join操作傳回Join後的Iterator[Row]
// TODO: Use Spark's HashMap implementation.
val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //比對主要使用HashMap實作
var currentRow: Row = null
// Create a mapping of buildKeys -> rows
while (buildIter.hasNext) { //目前隻對build Iterator進行疊代,形成rowKey,Rows,類似wordCount,但是這裡不是累加Value,而是Row的集合。
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow) //計算rowKey作為HashMap的key
if(!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
val matchList = if (existingMatchList == null) {
val newMatchList = new ArrayBuffer[Row]()
hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList)
newMatchList
} else {
existingMatchList
}
matchList += currentRow.copy() //傳回matchList
}
}
new Iterator[Row] { //最後用streamedRow的Key來比對buildSide端的HashMap
private[this] var currentStreamedRow: Row = _
private[this] var currentHashMatches: ArrayBuffer[Row] = _
private[this] var currentMatchPosition: Int = -1
// Mutable per row objects.
private[this] val joinRow = new JoinedRow
private[this] val joinKeys = streamSideKeyGenerator()
override final def hasNext: Boolean =
(currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
(streamIter.hasNext && fetchNext())
override final def next() = {
val ret = buildSide match {
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //右連接配接的話,streamedRow放左邊,比對到的key的Row放到右表
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //左連接配接的話,相反。
}
currentMatchPosition += 1
ret
}
/**
* Searches the streamed iterator for the next row that has at least one match in hashtable.
*
* @return true if the search is successful, and false if the streamed iterator runs out of
* tuples.
*/
private final def fetchNext(): Boolean = {
currentHashMatches = null
currentMatchPosition = -1
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
if (!joinKeys(currentStreamedRow).anyNull) {
currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow從buildSide裡的HashTable裡面比對rowKey
}
}
if (currentHashMatches == null) {
false
} else {
currentMatchPosition = 0
true
}
}
}
}
}
joinRow的實作,實作2個Row對接: 實際上就是生成一個新的Array,将2個Array合并。
class JoinedRow extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _
.........
def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues) //傳回一個新的合并後的Row
}
2.1、LeftSemiJoinHash
left semi join,不多說了,hive早期版本裡替代 IN和EXISTS 的版本。 将右表的join keys放到HashSet裡,然後周遊左表,查找左表的join key是否能比對。
case class LeftSemiJoinHash(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {
val buildSide = BuildRight //buildSide是以右表為基準
override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
override def output = left.output
def execute() = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的實體計劃執行後生成RDD,利用zipPartitions對Partition進行合并。然後用上述方法實作。
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
// Create a Hash set of buildKeys
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
if(!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
hashSet.add(rowKey)
}
}
}
val joinKeys = streamSideKeyGenerator()
streamIter.filter(current => {
!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
})
}
}
}
2.2、BroadcastHashJoin
名約: 廣播HashJoin,呵呵。 是InnerHashJoin的實作。這裡用到了concurrent并發裡的future,異步的廣播buildPlan的表執行後的的RDD。 如果接收到了廣播後的表,那麼就用streamedPlan來比對這個廣播的表。 實作是RDD的mapPartitions和HashJoin裡的joinIterators最後生成join的結果。
case class BroadcastHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: SparkPlan,
right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {
override def otherCopyArgs = sqlContext :: Nil
override def outputPartitioning: Partitioning = left.outputPartitioning
override def requiredChildDistribution =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
@transient
lazy val broadcastFuture = future { //利用SparkContext廣播表
sqlContext.sparkContext.broadcast(buildPlan.executeCollect())
}
def execute() = {
val broadcastRelation = Await.result(broadcastFuture, 5.minute)
streamedPlan.execute().mapPartitions { streamedIter =>
joinIterators(broadcastRelation.value.iterator, streamedIter) //調用joinIterators對每個分區map
}
}
}
2.3、ShuffleHashJoin
ShuffleHashJoin顧名思義就是需要shuffle資料,outputPartitioning是左孩子的的Partitioning。 會根據這個Partitioning進行shuffle。然後利用SparkContext裡的zipPartitions方法對每個分區進行zip。 這裡的requiredChildDistribution,的是ClusteredDistribution,這個會在HashPartitioning裡面進行比對。 關于這裡面的分區這裡不贅述,可以去org.apache.spark.sql.catalyst.plans.physical下的partitioning裡面去檢視。
case class ShuffledHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {
override def outputPartitioning: Partitioning = left.outputPartitioning
override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
def execute() = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) {
(buildIter, streamIter) => joinIterators(buildIter, streamIter)
}
}
}
未完待續 :)
原創文章,轉載請注明:
轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文連結位址:http://blog.csdn.net/oopsoom/article/details/38274621
注:本文基于署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協定,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章連結。如若需要用于商業目的或者與授權方面的協商,請聯系我。