UDF(user defined function)
UDF: 輸入一行, 傳回一個結果. 一對一關系
放入函數一個值, 就傳回一個值, 而不會傳回多個值.
如下面的例子就可以看出: (x: String) => "Name=" + x 這個函數, 入參為一個, 傳回也是一個, 而不會傳回多個值.
來個demo:
In路徑下的user.txt檔案内容如下:
{"name": "zhangsan","age": 20}
{"name": "lisi", "age": 30}
{"name": "wangwu","age":40}
// 使用者可以自定義函數
object SparkSql_UDF {
def main(args: Array[String]): Unit = {
/**
* 注意:
* 如果需要RDD與DF或者DS之間操作,那麼都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession對象的名稱】
* 前置條件:導入隐式轉換并建立一個RDD
*/
// 設定spark計算架構的運作(部署) 環境
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("spark")
// 建立SparkSql的環境對象
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
// 進行轉換之前, 需要引入隐式轉換規則
// 這裡的spark不是包名的含義, 是SparkSession對象的名字
import spark.implicits._
val frame: DataFrame = spark.read.json("in")
// 向spark中注冊一個addName函數,
val addName: UserDefinedFunction = spark.udf.register("addName", (x: String) => "Name=" + x)
// 給DataFrame起一個表名
frame.createOrReplaceTempView("user")
val frame1: DataFrame = spark.sql("select addName(name) bb from user")
// 展示資料
frame1.show()
}
}
運作結果:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcugjM3UDOwkDM0EzMwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
UDTF: 輸入一行, 傳回多行(hive). 一對多的關系. 在sparkSQL中沒有UDTF。在hive中有。在sparkSQL中使用flatMap就可以實作該功能!。 一對多的功能.
UDAF: user defined aggregate function
UDAF: 輸入多行, 傳回一行. aggregate(聚合) 比如: count,sum, avg, 這些是sparkSQL自帶的聚合函數,如果有複雜的業務需求,要自己定義
demo:
弱類型的UDAF
SQL的風格
object SparkSql_UDAF_Demo {
def main(args: Array[String]): Unit = {
// 設定spark計算架構的運作(部署) 環境
val sparkconf = new SparkConf().setMaster("local").setAppName("spark")
// 建立SparkSql的環境對象
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val gm: GenMean = new GenMean
//注冊聚合函數, 在後面使用就用gm來使用
spark.udf.register("gm", gm)
val range: Dataset[lang.Long] = spark.range(1, 11)
// 給DataFrame起個表名
range.createTempView("v_range")
// 使用自定義的聚合函數, sql風格
val result: DataFrame = spark.sql("select gm(id) from v_range")
result.show()
// 關閉資源
spark.stop()
}
}
class GenMean extends UserDefinedAggregateFunction {
// 輸入資料的類型
override def inputSchema: StructType = {
StructType(List(StructField("value", DoubleType)))
}
/*
産生中間結果的資料類型
相當于每個分區裡要進行計算
*/
override def bufferSchema: StructType = StructType(List(
// 相乘之後傳回的積
StructField("product", DoubleType),
// 參與運算數字的個數
StructField("count", LongType)
))
// 最終傳回的結果類型
override def dataType: DataType = DoubleType
// 確定一緻性, 一般用true
override def deterministic: Boolean = true
/*
每個分區裡的product和count值要有初始值
指定初始值
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 相乘的初始值
buffer(0) = 1.0
// 參與運算數字的個數的初始值
buffer(1) = 0L
}
// 每有一條資料參與運算就更新一下中間結果(update相當于在每一個分區中的運算)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 每有一個數字參與運算就進行相乘(包含了之前的中間結果)
buffer(0) = buffer.getDouble(0) + input.getDouble(0)
// 參與運算資料的個數也有在更新(也包含了之前的中間結果)
buffer(1) = buffer.getLong(1) + 1L
}
// 全局聚合, 每個分區來進行聚合運算
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// 每個分區計算的結果進行相乘
buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
// 每個分區參與預算的中間結果進行相加
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
/*
計算最終的結果
通過merge方法後, 各個分區的資料都相乘和累加了. 接下來就是将各個分區的資料統計出來的結果來
進行求出幾何平均數.
*/
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
}
}
運作結果:
可以畫個大概的圖來了解UDAF:
DSL的風格:
object SparkSql_UDAF_Demo {
def main(args: Array[String]): Unit = {
// 設定spark計算架構的運作(部署) 環境
val sparkconf = new SparkConf().setMaster("local").setAppName("spark")
// 建立SparkSql的環境對象
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
//建立聚合函數對象
val gm: GenMean = new GenMean
val range: Dataset[lang.Long] = spark.range(1, 11)
import spark.implicits._
// DSL的風格, 不需要注冊聚合函數
val result1: DataFrame = range.agg(gm($"id").as("geomean"))
result1.show()
// 關閉資源
spark.stop()
}
}
class GenMean extends UserDefinedAggregateFunction {
// 輸入資料的類型
override def inputSchema: StructType = {
StructType(List(StructField("value", DoubleType)))
}
/*
産生中間結果的資料類型
相當于每個分區裡要進行計算
*/
override def bufferSchema: StructType = StructType(List(
// 相乘之後傳回的積
StructField("product", DoubleType),
// 參與運算數字的個數
StructField("count", LongType)
))
// 最終傳回的結果類型
override def dataType: DataType = DoubleType
// 確定一緻性, 一般用true
override def deterministic: Boolean = true
/*
每個分區裡的product和count值要有初始值
指定初始值
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 相乘的初始值
buffer(0) = 1.0
// 參與運算數字的個數的初始值
buffer(1) = 0L
}
// 每有一條資料參與運算就更新一下中間結果(update相當于在每一個分區中的運算)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 每有一個數字參與運算就進行相乘(包含了之前的中間結果)
buffer(0) = buffer.getDouble(0) + input.getDouble(0)
// 參與運算資料的個數也有在更新(也包含了之前的中間結果)
buffer(1) = buffer.getLong(1) + 1L
}
// 全局聚合, 每個分區來進行聚合運算
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// 每個分區計算的結果進行相乘
buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
// 每個分區參與預算的中間結果進行相加
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
/*
計算最終的結果
通過merge方法後, 各個分區的資料都相乘和累加了. 接下來就是将各個分區的資料統計出來的結果來
進行求出幾何平均數.
*/
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
}
}
運作結果:
強類型的UDAF
繼承Aggregator抽象類
// 使用者可以自定義聚合函數(強類型)
object SparkSql_UDAF_Class {
def main(args: Array[String]): Unit = {
/**
* 注意:
* 如果需要RDD與DF或者DS之間操作,那麼都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession對象的名稱】
* 前置條件:導入隐式轉換并建立一個RDD
*/
// 設定spark計算架構的運作(部署) 環境
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("spark")
// 建立SparkSql的環境對象
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
// 進行轉換之前, 需要引入隐式轉換規則
// 這裡的spark不是包名的含義, 是SparkSession對象的名字
import spark.implicits._
// 讀取資料
val frame: DataFrame = spark.read.json("in")
frame.show()
// 建立聚合函數對象
val function = new MyAgeAvgClassFunction
// 将聚合函數轉換為查詢列
val col: TypedColumn[UserBean, Double] = function.toColumn.name("shaojunjun")
// 将DataFrame轉換為DataSet
val value1: Dataset[UserBean] = frame.as[UserBean]
// 将轉換的查詢列放入到DSL風格文法裡
value1.select(col).show()
spark.close()
}
}
//聲明使用者自定義聚合函數(強類型)
// 1. 繼承 Aggregator類, 設定泛型
// 2. 實作方法
// 樣例類中的屬性預設是val, 隻讀
case class UserBean(name: String, age: BigInt)
case class AvgBuffer(var sum: BigInt, var count: Int)
class MyAgeAvgClassFunction extends Aggregator[UserBean, AvgBuffer, Double]{
// 初始化
override def zero: AvgBuffer = {
// 建構緩沖區對象
AvgBuffer(0,0)
}
//聚合資料
override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
b.sum = b.sum + a.age
b.count = b.count + 1
b
}
// 緩沖區的合并
override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
// 完成計算
override def finish(reduction: AvgBuffer): Double = {
reduction.sum.toDouble / reduction.count
}
// 自定義類型寫 Encoders.product
override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
// 基本類型寫 Encoders.scalaDouble
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}