天天看點

Scala使用flatMap來實作UDTF的功能效果

寫在前面:

工作中發現大家經常寫UDF,偶爾寫UDAF,但幾乎很少有人寫UDTF。通常UDF、UDAF就能滿足大多的工作需求了。

當然有很多同僚喜歡寫map來代替寫UDF,這是可以達到同樣效果的。map的功能很強大。

你會發現flatMap的功能也很強大。靈活使用它就完成可以實作寫UDTF達到的效果。是以,這大概也是看同僚代碼的時候可以經常看到flatMap,而很少見到寫UDTF的原因吧。

當然還有使用mapPartitions()的方法來達到UDAF同樣效果的寫法。我們在下一篇中介紹。

直接上代碼來執行個體感受下flatMap達到UDTF功能效果的實作。

執行個體一:使用可變的listBuffer來緩存需要輸出的多列資料

//in scala

import org.apache.spark.sql.SparkSession

object udtf {
  def main(args:Array[String])={
    //因為我用了本地core來運作,是以master設定了local模式
    val spark = SparkSession.builder().appName("flatMap-UDTF-Test").master("local[2]")
      .enableHiveSupport()
      .getOrCreate()
    val df =spark.createDataFrame(
      Seq((1,"Hello world"),(2,"Toby Gao")
      )).toDF("id","words")
    df.show()
    
    import spark.implicits._
    val result = df.flatMap(row => {
      val list = scala.collection.mutable.ListBuffer[(Int,String,String)]() //這裡不能用case class ,用tuple來定義ListBuffer中的變量類型可以
      val id = row.getAs[Int]("id")
      val words = row.getAs[String]("words").split(" ")
      for (word <- words) {
        list.append((id, word,"flatMap"))
      }
      list
    }).toDF("id","word","flag")

    result.printSchema()

    result.show(false)

  }
}
           

解讀:

輸入的DF是2行*2列的:

Scala使用flatMap來實作UDTF的功能效果

輸出的DF的Schema是:

Scala使用flatMap來實作UDTF的功能效果

輸出的DF結果是:4行*3列的

Scala使用flatMap來實作UDTF的功能效果

執行個體二:使用zip将需要輸出的多列資料對應起來

//in scala
package com.jd.datamill9n.goal.metrics.JD

import org.apache.spark.sql.SparkSession

object udtf {
  def main(args:Array[String])={

    //因為我用了本地core來運作,是以master設定了local模式
    val spark = SparkSession.builder().appName("flatMap-UDTF-Test").master("local[2]")
      .enableHiveSupport()
      .getOrCreate()

    val df =spark.createDataFrame(
      Seq((1,"Hello world"),(2,"Toby Gao")
      )).toDF("id","words")
    df.show()
    
    //注意import進來implicits
    import spark.implicits._ 
    
    //使用zip将需要輸出的多列對應起來
    df.flatMap{row =>{val id=row.getAs[Int]("id")
      val words = row.getAs[String]("words")
      Array(id,id).zip(words.split(" "))}
    }.toDF("id","word").show()



  }
}

           
Scala使用flatMap來實作UDTF的功能效果

繼續閱讀