天天看點

spark的UDF和UDAF用法

UDF(user defined function)

UDF:  輸入一行, 傳回一個結果.  一對一關系     
放入函數一個值, 就傳回一個值, 而不會傳回多個值. 
如下面的例子就可以看出: (x: String) => "Name=" + x     這個函數, 入參為一個, 傳回也是一個, 而不會傳回多個值.
           

來個demo:

In路徑下的user.txt檔案内容如下:

{"name": "zhangsan","age": 20}
{"name": "lisi", "age": 30}
{"name": "wangwu","age":40}
           
// 使用者可以自定義函數
object SparkSql_UDF {
  def main(args: Array[String]): Unit = {
    /**
     * 注意:
     * 如果需要RDD與DF或者DS之間操作,那麼都需要引入 import spark.implicits._  【spark不是包名,而是sparkSession對象的名稱】
     * 前置條件:導入隐式轉換并建立一個RDD
     */


    // 設定spark計算架構的運作(部署) 環境
    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("spark")

    // 建立SparkSql的環境對象
    val spark = SparkSession.builder().config(sparkconf).getOrCreate()

    // 進行轉換之前, 需要引入隐式轉換規則
    // 這裡的spark不是包名的含義, 是SparkSession對象的名字
    import spark.implicits._

    val frame: DataFrame = spark.read.json("in")

    // 向spark中注冊一個addName函數,
    val addName: UserDefinedFunction = spark.udf.register("addName", (x: String) => "Name=" + x)

    // 給DataFrame起一個表名
    frame.createOrReplaceTempView("user")

    val frame1: DataFrame = spark.sql("select addName(name)  bb from user")
    // 展示資料
    frame1.show()
  }
}
           

運作結果:

spark的UDF和UDAF用法

UDTF: 輸入一行, 傳回多行(hive). 一對多的關系. 在sparkSQL中沒有UDTF。在hive中有。在sparkSQL中使用flatMap就可以實作該功能!。 一對多的功能.

UDAF: user defined aggregate function

UDAF: 輸入多行, 傳回一行. aggregate(聚合)   比如: count,sum, avg,  這些是sparkSQL自帶的聚合函數,如果有複雜的業務需求,要自己定義
           

demo:

弱類型的UDAF

SQL的風格

object SparkSql_UDAF_Demo {
  def main(args: Array[String]): Unit = {
    // 設定spark計算架構的運作(部署) 環境
    val sparkconf = new SparkConf().setMaster("local").setAppName("spark")

    // 建立SparkSql的環境對象
    val spark = SparkSession.builder().config(sparkconf).getOrCreate()

    val gm: GenMean = new GenMean

    //注冊聚合函數, 在後面使用就用gm來使用
    spark.udf.register("gm", gm)

    val range: Dataset[lang.Long] = spark.range(1, 11)
    // 給DataFrame起個表名
    range.createTempView("v_range")
    // 使用自定義的聚合函數,  sql風格
    val result: DataFrame = spark.sql("select gm(id) from v_range")
    result.show()

    // 關閉資源
    spark.stop()
  }
}

class GenMean extends UserDefinedAggregateFunction {
  // 輸入資料的類型
  override def inputSchema: StructType = {
    StructType(List(StructField("value", DoubleType)))
  }

  /*
   産生中間結果的資料類型
   相當于每個分區裡要進行計算
  */
  override def bufferSchema: StructType = StructType(List(
    // 相乘之後傳回的積
    StructField("product", DoubleType),
    // 參與運算數字的個數
    StructField("count", LongType)
  ))

  // 最終傳回的結果類型
  override def dataType: DataType = DoubleType

  // 確定一緻性, 一般用true
  override def deterministic: Boolean = true

  /*
   每個分區裡的product和count值要有初始值
    指定初始值
  */
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 相乘的初始值
    buffer(0) = 1.0
    // 參與運算數字的個數的初始值
    buffer(1) = 0L
  }

  // 每有一條資料參與運算就更新一下中間結果(update相當于在每一個分區中的運算)
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    // 每有一個數字參與運算就進行相乘(包含了之前的中間結果)
    buffer(0) = buffer.getDouble(0) + input.getDouble(0)
    // 參與運算資料的個數也有在更新(也包含了之前的中間結果)
    buffer(1) = buffer.getLong(1) + 1L
  }

  // 全局聚合, 每個分區來進行聚合運算
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    // 每個分區計算的結果進行相乘
    buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
    // 每個分區參與預算的中間結果進行相加
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  /*
   計算最終的結果
   通過merge方法後, 各個分區的資料都相乘和累加了.  接下來就是将各個分區的資料統計出來的結果來
   進行求出幾何平均數.
  */
  override def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
  }
}
           

運作結果:

spark的UDF和UDAF用法

可以畫個大概的圖來了解UDAF:

spark的UDF和UDAF用法

DSL的風格:

object SparkSql_UDAF_Demo {
  def main(args: Array[String]): Unit = {
    // 設定spark計算架構的運作(部署) 環境
    val sparkconf = new SparkConf().setMaster("local").setAppName("spark")

    // 建立SparkSql的環境對象
    val spark = SparkSession.builder().config(sparkconf).getOrCreate()
    //建立聚合函數對象
    val gm: GenMean = new GenMean
    
    val range: Dataset[lang.Long] = spark.range(1, 11)
 
    import spark.implicits._
    // DSL的風格, 不需要注冊聚合函數
    val result1: DataFrame = range.agg(gm($"id").as("geomean"))
    result1.show()
    // 關閉資源
    spark.stop()
  }
}

class GenMean extends UserDefinedAggregateFunction {
  // 輸入資料的類型
  override def inputSchema: StructType = {
    StructType(List(StructField("value", DoubleType)))
  }

  /*
   産生中間結果的資料類型
   相當于每個分區裡要進行計算
  */
  override def bufferSchema: StructType = StructType(List(
    // 相乘之後傳回的積
    StructField("product", DoubleType),
    // 參與運算數字的個數
    StructField("count", LongType)
  ))

  // 最終傳回的結果類型
  override def dataType: DataType = DoubleType

  // 確定一緻性, 一般用true
  override def deterministic: Boolean = true

  /*
   每個分區裡的product和count值要有初始值
    指定初始值
  */
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 相乘的初始值
    buffer(0) = 1.0
    // 參與運算數字的個數的初始值
    buffer(1) = 0L
  }

  // 每有一條資料參與運算就更新一下中間結果(update相當于在每一個分區中的運算)
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    // 每有一個數字參與運算就進行相乘(包含了之前的中間結果)
    buffer(0) = buffer.getDouble(0) + input.getDouble(0)
    // 參與運算資料的個數也有在更新(也包含了之前的中間結果)
    buffer(1) = buffer.getLong(1) + 1L
  }

  // 全局聚合, 每個分區來進行聚合運算
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    // 每個分區計算的結果進行相乘
    buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
    // 每個分區參與預算的中間結果進行相加
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  /*
   計算最終的結果
   通過merge方法後, 各個分區的資料都相乘和累加了.  接下來就是将各個分區的資料統計出來的結果來
   進行求出幾何平均數.
  */
  override def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
  }
}
           

運作結果:

spark的UDF和UDAF用法

強類型的UDAF

繼承Aggregator抽象類

// 使用者可以自定義聚合函數(強類型)
object SparkSql_UDAF_Class {
  def main(args: Array[String]): Unit = {

    /**
     * 注意:
     * 如果需要RDD與DF或者DS之間操作,那麼都需要引入 import spark.implicits._  【spark不是包名,而是sparkSession對象的名稱】
     * 前置條件:導入隐式轉換并建立一個RDD
     */


    // 設定spark計算架構的運作(部署) 環境
    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("spark")

    // 建立SparkSql的環境對象
    val spark = SparkSession.builder().config(sparkconf).getOrCreate()

    // 進行轉換之前, 需要引入隐式轉換規則
    // 這裡的spark不是包名的含義, 是SparkSession對象的名字
    import spark.implicits._

    // 讀取資料
    val frame: DataFrame = spark.read.json("in")
    frame.show()

    // 建立聚合函數對象
    val function = new MyAgeAvgClassFunction

    // 将聚合函數轉換為查詢列
    val col: TypedColumn[UserBean, Double] = function.toColumn.name("shaojunjun")

    // 将DataFrame轉換為DataSet
    val value1: Dataset[UserBean] = frame.as[UserBean]
    // 将轉換的查詢列放入到DSL風格文法裡
    value1.select(col).show()

    spark.close()

  }
}
//聲明使用者自定義聚合函數(強類型)
// 1. 繼承 Aggregator類, 設定泛型
// 2. 實作方法

// 樣例類中的屬性預設是val, 隻讀
case class UserBean(name: String, age: BigInt)
case class AvgBuffer(var sum: BigInt, var count: Int)

class MyAgeAvgClassFunction extends Aggregator[UserBean, AvgBuffer, Double]{
  // 初始化
  override def zero: AvgBuffer = {
    // 建構緩沖區對象
    AvgBuffer(0,0)
  }

  //聚合資料
  override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
    b.sum = b.sum + a.age
    b.count = b.count + 1
    b
  }

  // 緩沖區的合并
  override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
    b1.sum = b1.sum + b2.sum
    b1.count = b1.count + b2.count

    b1
  }

  // 完成計算
  override def finish(reduction: AvgBuffer): Double = {
    reduction.sum.toDouble / reduction.count
  }

  // 自定義類型寫  Encoders.product
  override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product

  // 基本類型寫  Encoders.scalaDouble
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

           

繼續閱讀