天天看點

SparkSQL重點知識之自定義UDF函數

簡介: 無論Hive還是SparkSQL分析處理資料時,往往需要使用函數,SparkSQL子產品本身自帶很多實作公共功能的函數,在org.apache.spark.sql.functions中。SparkSQL與Hive一樣支援定義函數:UDF和UDAF,尤其是UDF函數在實際項目中使用最為廣泛。

1. SparkSQL與Hive的自定義函數對比

Hive的三種自定義函數:

  • 第一種:UDF(User-Defined-Function) 函數
  1. 一對一的關系,輸入一個值經過函數以後輸出一個值;
  2. 在Hive中繼承UDF類,方法名稱為evaluate,傳回值不能為void,其實就是實作一個方法;
  • 第二種:UDAF(User-Defined Aggregation Function) 聚合函數
  1. 多對一的關系,輸入多個值輸出一個值,通常與groupBy聯合使用;
  • 第三種:UDTF(User-Defined Table-Generating Functions) 函數
  1. 一對多的關系,輸入一個值輸出多個值(一行變為多行);
  2. 使用者自定義生成函數,有點像flatMap;

SparkSQL的自定義函數:

在SparkSQL中,目前僅僅支援UDF函數和UDAF函數

  • UDF函數:一對一關系(用的最多);
  • UDAF函數:聚合函數,通常與group by 分組函數連用,多對一關系;

    由于SparkSQL資料分析有兩種方式:DSL程式設計和SQL程式設計,是以定義UDF函數也有兩種方式,不同方式可以在不同分析中使用。

2. SparkSQL的UDF函數使用

使用SparkSession中udf方法定義和注冊函數,在SQL中使用,使用如下方式定義:

SparkSQL重點知識之自定義UDF函數

代碼示範:自定義UDF函數

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
 * @author liu a fu
 * @date 2021/1/18 0018
 * @version 1.0
 * @DESC  Spark的自定函數  UDF
 */
case class Smaller(line: String)

object _14udf {
  def main(args: Array[String]): Unit = {
    //1-準備環境
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[8]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    //從rdd轉化到df需要導入隐式轉換
    import spark.implicits._

    //讀取資料源檔案
    val fileRDD: RDD[String] = spark.sparkContext.textFile("data/input/udf/udf.txt")
    val valueDF: DataFrame = fileRDD.map(x => Smaller(x)).toDF()
    valueDF.show()
    valueDF.printSchema()

    //3-需求1:實作文本檔案的内容轉化為全部大寫
    //udf中一行資料進來,轉化為大寫後輸出
    spark.udf.register("smallerToBigger1",(line:String ) =>{
      line.toUpperCase()
    })

    //SQL:
    valueDF.createOrReplaceTempView("table")
    spark.sql("select line,smallerToBigger1(line) as smaller from table").show()
    /**
     * root
     * |-- line: string (nullable = true)
     *
     * +-----+-------+
     * | line|smaller|
     * +-----+-------+
     * |Hello|  HELLO|
     * |Spark|  SPARK|
     * |Flink|  FLINK|
     * +-----+-------+
     */

    //需求2: 求 value的平方值
    val df: DataFrame = Seq(("id1", 1), ("id2", 5), ("id3", 10)).toDF("id", "value")
    spark.udf.register("udf1",(x:Int)=>{
      x * x
    })
    df.select($"id",callUDF("udf1",$"value").as("count1")).show()
    /**
     * +---+------+
     * | id|count1|
     * +---+------+
     * |id1|     1|
     * |id2|    25|
     * |id3|   100|
     * +---+------+
     */

    //需求3: value1 value2相加
    val df1: DataFrame = Seq(("id1", 12, 13), ("id2", 12, 12), ("id3", 33, 33)).toDF("id", "value1","value2")
    spark.udf.register("udf2",(x1:Int,x2:Int)=>{
      x1 + x2
    })
    df1.select($"id",callUDF("udf2",$"value1",$"value2").as("sum")).show()

    /**
     * +---+---+
     * | id|sum|
     * +---+---+
     * |id1| 25|
     * |id2| 24|
     * |id3| 66|
     * +---+---+
     */
  }
}
           

繼續閱讀