簡介: 無論Hive還是SparkSQL分析處理資料時,往往需要使用函數,SparkSQL子產品本身自帶很多實作公共功能的函數,在org.apache.spark.sql.functions中。SparkSQL與Hive一樣支援定義函數:UDF和UDAF,尤其是UDF函數在實際項目中使用最為廣泛。
1. SparkSQL與Hive的自定義函數對比
Hive的三種自定義函數:
- 第一種:UDF(User-Defined-Function) 函數
- 一對一的關系,輸入一個值經過函數以後輸出一個值;
- 在Hive中繼承UDF類,方法名稱為evaluate,傳回值不能為void,其實就是實作一個方法;
- 第二種:UDAF(User-Defined Aggregation Function) 聚合函數
- 多對一的關系,輸入多個值輸出一個值,通常與groupBy聯合使用;
- 第三種:UDTF(User-Defined Table-Generating Functions) 函數
- 一對多的關系,輸入一個值輸出多個值(一行變為多行);
- 使用者自定義生成函數,有點像flatMap;
SparkSQL的自定義函數:
在SparkSQL中,目前僅僅支援UDF函數和UDAF函數
- UDF函數:一對一關系(用的最多);
-
UDAF函數:聚合函數,通常與group by 分組函數連用,多對一關系;
由于SparkSQL資料分析有兩種方式:DSL程式設計和SQL程式設計,是以定義UDF函數也有兩種方式,不同方式可以在不同分析中使用。
2. SparkSQL的UDF函數使用
使用SparkSession中udf方法定義和注冊函數,在SQL中使用,使用如下方式定義:

代碼示範:自定義UDF函數
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* @author liu a fu
* @date 2021/1/18 0018
* @version 1.0
* @DESC Spark的自定函數 UDF
*/
case class Smaller(line: String)
object _14udf {
def main(args: Array[String]): Unit = {
//1-準備環境
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[8]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//從rdd轉化到df需要導入隐式轉換
import spark.implicits._
//讀取資料源檔案
val fileRDD: RDD[String] = spark.sparkContext.textFile("data/input/udf/udf.txt")
val valueDF: DataFrame = fileRDD.map(x => Smaller(x)).toDF()
valueDF.show()
valueDF.printSchema()
//3-需求1:實作文本檔案的内容轉化為全部大寫
//udf中一行資料進來,轉化為大寫後輸出
spark.udf.register("smallerToBigger1",(line:String ) =>{
line.toUpperCase()
})
//SQL:
valueDF.createOrReplaceTempView("table")
spark.sql("select line,smallerToBigger1(line) as smaller from table").show()
/**
* root
* |-- line: string (nullable = true)
*
* +-----+-------+
* | line|smaller|
* +-----+-------+
* |Hello| HELLO|
* |Spark| SPARK|
* |Flink| FLINK|
* +-----+-------+
*/
//需求2: 求 value的平方值
val df: DataFrame = Seq(("id1", 1), ("id2", 5), ("id3", 10)).toDF("id", "value")
spark.udf.register("udf1",(x:Int)=>{
x * x
})
df.select($"id",callUDF("udf1",$"value").as("count1")).show()
/**
* +---+------+
* | id|count1|
* +---+------+
* |id1| 1|
* |id2| 25|
* |id3| 100|
* +---+------+
*/
//需求3: value1 value2相加
val df1: DataFrame = Seq(("id1", 12, 13), ("id2", 12, 12), ("id3", 33, 33)).toDF("id", "value1","value2")
spark.udf.register("udf2",(x1:Int,x2:Int)=>{
x1 + x2
})
df1.select($"id",callUDF("udf2",$"value1",$"value2").as("sum")).show()
/**
* +---+---+
* | id|sum|
* +---+---+
* |id1| 25|
* |id2| 24|
* |id3| 66|
* +---+---+
*/
}
}