在使用 Spark SQL 的過程中,經常會用到 groupBy 這個函數進行一些統計工作。但是會發現除了 groupBy 外,還有一個 groupByKey(注意RDD 也有一個 groupByKey,而這裡的 groupByKey 是 DataFrame 的 ) 。這個 groupByKey 引起了我的好奇,那我們就到源碼裡面一探究竟吧。
所用 spark 版本:spark 2.1.0
先從使用的角度來說,
groupBy:groupBy類似于傳統SQL語言中的group by子語句,但比較不同的是groupBy()可以帶多個列名,對多個列進行group。比如想根據 "id" 和 "name" 進行 groupBy 的話可以
df.goupBy("id","name")
groupBy傳回的類型是RelationalGroupedDataset。
groupByKey:groupByKey則更加靈活,可以根據使用者自己對列的組合來進行groupBy,比如上面的那個例子,根據 "id" 和 "name" 進行 groupBy,使用groupByKey可以這樣。
//同前面的goupBy效果是一樣的,但傳回的類型是不一樣的
df..toDF("id","name").goupByKey(row =>{
row.getString(0) + row.getString(1)
})
但和groupBy不同的是groupByKey傳回的類型是KeyValueGroupedDataset。
下面來看看這兩個方法的實作有何差別。
groupBy
def groupBy(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)
}
最終會去建立一個RelationalGroupedDataset,而這個方法提供count(),max(),agg(),等方法。值得一提的是,這個類在spark1.x的時候類名為“GroupedData”。看看類中的注釋吧
/**
* A set of methods for aggregations on a `DataFrame`, created by `Dataset.groupBy`.
*
* The main method is the agg function, which has multiple variants. This class also contains
* convenience some first order statistics such as mean, sum for convenience.
*
* This class was named `GroupedData` in Spark 1.x.
*
* @since 2.0.0
*/
@InterfaceStability.Stable
class RelationalGroupedDataset protected[sql](
groupByKey
@Experimental
@InterfaceStability.Evolving
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)
new KeyValueGroupedDataset(
encoderFor[K],
encoderFor[T],
executed,
inputPlan.output,
withGroupingKey.newColumns)
}
可以發現最後生成和傳回的類是KeyValueGroupedDataset。這是dataset的子類,表示聚合過之後的dataset。
我們再看看這個類中的注釋吧
/**
* :: Experimental ::
* A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not
* construct a [[KeyValueGroupedDataset]] directly, but should instead call `groupByKey` on
* an existing [[Dataset]].
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
class KeyValueGroupedDataset[K, V] private[sql](
可以發現 groupByKey 還處于實驗階段。它是希望可以由使用者自己來實作 groupBy 的規則,而不像 groupBy() 一樣,需要被列屬性所束縛。
通過 groupByKey 使用者可以按照自己的需求來進行 grouping 。
總而言之,groupByKey雖然提供了更加靈活的處理 grouping 的方式,但 groupByKey 後傳回的類是 KeyValueGroupedDataset ,它裡面所提供的操作接口也不如 groupBy 傳回的 RelationalGroupedDataset 所提供的接口豐富。除非真的有一些特殊的 grouping 操作,否則還是使用 groupBy 吧。