天天看點

SparkSQL 自定義聚合函數UDAF實戰應用

package sparkSql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

/**
 * @Author yqq
 * @Date 2021/12/14 14:34
 * @Version 1.0
 * 與聚合函數同時出現在Select後的字段,需要跟在 group by 後面
 */
object SparkSQLUDAF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local").appName("test02").getOrCreate()
    session.sparkContext.setLogLevel("Error")
    val list = List[String]("kobe", "james", "kobe", "durant", "kobe","kobe","james","james","durant")
    import session.implicits._
    val frame = list.toDF("name")
    frame.createTempView("t")

    /**
     * 自定義聚合函數UDAF
     */
    session.udf.register("nameCount",new UserDefinedAggregateFunction {
      //調用UDF函數時,傳參的類型
      override def inputSchema: StructType = StructType(List[StructField](
        StructField("name",DataTypes.StringType)
      ))
      //設定在計算過程中,更新的資料類型
      override def bufferSchema: StructType = StructType(List[StructField](
        StructField("name",DataTypes.IntegerType)
      ))
      //指定調用函數最後傳回類型
      override def dataType: DataType = DataTypes.IntegerType
      //多次運作,結果順序保持一緻
      override def deterministic: Boolean = true
      //作用在map,reduce兩側給每個分區内的每個分組的資料做初始值
      override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
      //作用在map端每個分區的每個分組上
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = buffer.update(0,buffer.getInt(0)+1)
      //作用在reduce端,每個分區的每個分組上,對map的結果做聚合
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
      //調用函數最後傳回的資料結果
      override def evaluate(buffer: Row): Any = buffer.getInt(0)
    })
    session.sql(
      """
        |select name,nameCount(name) as totalCount from t group by name
        |""".stripMargin).show()
  }
}      

繼續閱讀