寫在前面:
工作中發現大家經常寫UDF,偶爾寫UDAF,但幾乎很少有人寫UDTF。通常UDF、UDAF就能滿足大多的工作需求了。
當然有很多同僚喜歡寫map來代替寫UDF,這是可以達到同樣效果的。map的功能很強大。
你會發現flatMap的功能也很強大。靈活使用它就完成可以實作寫UDTF達到的效果。是以,這大概也是看同僚代碼的時候可以經常看到flatMap,而很少見到寫UDTF的原因吧。
當然還有使用mapPartitions()的方法來達到UDAF同樣效果的寫法。我們在下一篇中介紹。
直接上代碼來執行個體感受下flatMap達到UDTF功能效果的實作。
執行個體一:使用可變的listBuffer來緩存需要輸出的多列資料
//in scala
import org.apache.spark.sql.SparkSession
object udtf {
def main(args:Array[String])={
//因為我用了本地core來運作,是以master設定了local模式
val spark = SparkSession.builder().appName("flatMap-UDTF-Test").master("local[2]")
.enableHiveSupport()
.getOrCreate()
val df =spark.createDataFrame(
Seq((1,"Hello world"),(2,"Toby Gao")
)).toDF("id","words")
df.show()
import spark.implicits._
val result = df.flatMap(row => {
val list = scala.collection.mutable.ListBuffer[(Int,String,String)]() //這裡不能用case class ,用tuple來定義ListBuffer中的變量類型可以
val id = row.getAs[Int]("id")
val words = row.getAs[String]("words").split(" ")
for (word <- words) {
list.append((id, word,"flatMap"))
}
list
}).toDF("id","word","flag")
result.printSchema()
result.show(false)
}
}
解讀:
輸入的DF是2行*2列的:

輸出的DF的Schema是:
輸出的DF結果是:4行*3列的
執行個體二:使用zip将需要輸出的多列資料對應起來
//in scala
package com.jd.datamill9n.goal.metrics.JD
import org.apache.spark.sql.SparkSession
object udtf {
def main(args:Array[String])={
//因為我用了本地core來運作,是以master設定了local模式
val spark = SparkSession.builder().appName("flatMap-UDTF-Test").master("local[2]")
.enableHiveSupport()
.getOrCreate()
val df =spark.createDataFrame(
Seq((1,"Hello world"),(2,"Toby Gao")
)).toDF("id","words")
df.show()
//注意import進來implicits
import spark.implicits._
//使用zip将需要輸出的多列對應起來
df.flatMap{row =>{val id=row.getAs[Int]("id")
val words = row.getAs[String]("words")
Array(id,id).zip(words.split(" "))}
}.toDF("id","word").show()
}
}