天天看點

小知識整理----SPARK UD函數 udf()與udf.register()的使用

udf()的使用

val spark = SparkSession.builder()
      .master("local")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import  spark.implicits._
    
    val valuerdd = spark.sparkContext.textFile("data/etl_access.log").map(
      x => {
        val splits = x.split("\t")
        //注意 rowrdd (是否缺失字段)
        Row(Some(splits(0)).getOrElse("www"), Some(splits(1)).getOrElse("111.111.111.111"))
      }
    )
    val schema = StructType(Array(
      StructField("domain",StringType),
      StructField("ip",StringType)
    ))
    //自定義函數Udf 注意 導包
    val getcity = udf(Ip2Util.getAaddressByIp(_:String))
    //建立df
    val df = spark.createDataFrame(valuerdd,schema)
    df.select($"domain",getcity($"ip")).show()
    spark.stop()
           

register()的使用

val spark = SparkSession.builder()
      .master("local")
      .appName("SparkSessionApp")
      .getOrCreate()

    import spark.implicits._
   
    val udf = spark.sparkContext.textFile("data/udf.txt")
      .map(_.split("\t"))
      .map(x => FootballTeam(x(0), x(1)))
    udf.toDF().createOrReplaceTempView("teams")
	 /**
      * step1: 定義 注冊
      * step2: 使用
      */
    spark.udf.register("teams_length",(input:String)=>{
      input.split(",").length
    })

    spark.sql("select name,teams,teams_length(teams) from teams").show()

    spark.stop()
           

繼續閱讀