简介: 无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在org.apache.spark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
1. SparkSQL与Hive的自定义函数对比
Hive的三种自定义函数:
- 第一种:UDF(User-Defined-Function) 函数
- 一对一的关系,输入一个值经过函数以后输出一个值;
- 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
- 第二种:UDAF(User-Defined Aggregation Function) 聚合函数
- 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
- 第三种:UDTF(User-Defined Table-Generating Functions) 函数
- 一对多的关系,输入一个值输出多个值(一行变为多行);
- 用户自定义生成函数,有点像flatMap;
SparkSQL的自定义函数:
在SparkSQL中,目前仅仅支持UDF函数和UDAF函数
- UDF函数:一对一关系(用的最多);
-
UDAF函数:聚合函数,通常与group by 分组函数连用,多对一关系;
由于SparkSQL数据分析有两种方式:DSL编程和SQL编程,所以定义UDF函数也有两种方式,不同方式可以在不同分析中使用。
2. SparkSQL的UDF函数使用
使用SparkSession中udf方法定义和注册函数,在SQL中使用,使用如下方式定义:

代码演示:自定义UDF函数
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* @author liu a fu
* @date 2021/1/18 0018
* @version 1.0
* @DESC Spark的自定函数 UDF
*/
case class Smaller(line: String)
object _14udf {
def main(args: Array[String]): Unit = {
//1-准备环境
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[8]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//从rdd转化到df需要导入隐式转换
import spark.implicits._
//读取数据源文件
val fileRDD: RDD[String] = spark.sparkContext.textFile("data/input/udf/udf.txt")
val valueDF: DataFrame = fileRDD.map(x => Smaller(x)).toDF()
valueDF.show()
valueDF.printSchema()
//3-需求1:实现文本文件的内容转化为全部大写
//udf中一行数据进来,转化为大写后输出
spark.udf.register("smallerToBigger1",(line:String ) =>{
line.toUpperCase()
})
//SQL:
valueDF.createOrReplaceTempView("table")
spark.sql("select line,smallerToBigger1(line) as smaller from table").show()
/**
* root
* |-- line: string (nullable = true)
*
* +-----+-------+
* | line|smaller|
* +-----+-------+
* |Hello| HELLO|
* |Spark| SPARK|
* |Flink| FLINK|
* +-----+-------+
*/
//需求2: 求 value的平方值
val df: DataFrame = Seq(("id1", 1), ("id2", 5), ("id3", 10)).toDF("id", "value")
spark.udf.register("udf1",(x:Int)=>{
x * x
})
df.select($"id",callUDF("udf1",$"value").as("count1")).show()
/**
* +---+------+
* | id|count1|
* +---+------+
* |id1| 1|
* |id2| 25|
* |id3| 100|
* +---+------+
*/
//需求3: value1 value2相加
val df1: DataFrame = Seq(("id1", 12, 13), ("id2", 12, 12), ("id3", 33, 33)).toDF("id", "value1","value2")
spark.udf.register("udf2",(x1:Int,x2:Int)=>{
x1 + x2
})
df1.select($"id",callUDF("udf2",$"value1",$"value2").as("sum")).show()
/**
* +---+---+
* | id|sum|
* +---+---+
* |id1| 25|
* |id2| 24|
* |id3| 66|
* +---+---+
*/
}
}