天天看點

Scala005-DataFrame中使用UDF

在處理spark.DataFrame時,經常會用到udf,簡單做些總結和筆記。

構造資料

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}      
Intitializing Scala interpreter ...



Spark Web UI available at 11111111111:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1598929668275)
SparkSession available as 'spark'






import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}      
val builder = SparkSession
      .builder()
      .appName("learningScala")
      .config("spark.executor.heartbeatInterval","60s")
      .config("spark.network.timeout","120s")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryoserializer.buffer.max","512m")
      .config("spark.dynamicAllocation.enabled", false)
      .config("spark.sql.inMemoryColumnarStorage.compressed", true)
      .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
      .config("spark.sql.broadcastTimeout", 600)
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .config("spark.sql.crossJoin.enabled", true)
      .master("local[*]") 
val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")      
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@64837d8
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@542c0943      
var df = Seq(
      ("A", 1, 4,7),
      ("B", 2, 5,8),
      ("C", 3 ,6,9)).toDF("id", "x", "y","z")
df.show(truncate=false)      
+---+---+---+---+
|id |x  |y  |z  |
+---+---+---+---+
|A  |1  |4  |7  |
|B  |2  |5  |8  |
|C  |3  |6  |9  |
+---+---+---+---+






df: org.apache.spark.sql.DataFrame = [id: string, x: int ... 2 more fields]      
df.printSchema()      
root
 |-- id: string (nullable = true)
 |-- x: integer (nullable = false)
 |-- y: integer (nullable = false)
 |-- z: integer (nullable = false)      

方法一

該方法對外部可見,可以直接在DataFrame中使用,但是不可以在spark.sql中使用

def add_one(useCol1:Int,useCol2:Int)={
    useCol1+useCol2
}      
add_one: (useCol1: Int, useCol2: Int)Int      
import org.apache.spark.sql.functions.{udf,col}
val add_one_udf = udf(add_one(_:Int,_:Int))      
import org.apache.spark.sql.functions.{udf, col}
add_one_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(IntegerType, IntegerType)))      
df.withColumn("sum",add_one_udf(col("y"),col("z"))).show(truncate=false)      
+---+---+---+---+---+
|id |x  |y  |z  |sum|
+---+---+---+---+---+
|A  |1  |4  |7  |11 |
|B  |2  |5  |8  |13 |
|C  |3  |6  |9  |15 |
+---+---+---+---+---+      

方法二

spark.udf.register("add_one_udf2", add_one _)      
res16: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(IntegerType, IntegerType)))      
import org.apache.spark.sql.functions
df.withColumn("sum", functions.callUDF("add_one_udf2", col("y"),col("z"))).show(truncate=false)      
+---+---+---+---+---+
|id |x  |y  |z  |sum|
+---+---+---+---+---+
|A  |1  |4  |7  |11 |
|B  |2  |5  |8  |13 |
|C  |3  |6  |9  |15 |
+---+---+---+---+---+

import org.apache.spark.sql.functions      
df.createOrReplaceTempView("df")      
spark.sql("select *,add_one_udf2(y,z) AS sum  from df").show()      
+---+---+---+---+---+
| id|  x|  y|  z|sum|
+---+---+---+---+---+
|  A|  1|  4|  7| 11|
|  B|  2|  5|  8| 13|
|  C|  3|  6|  9| 15|
+---+---+---+---+---+      

繼續閱讀