udf()的使用
val spark = SparkSession.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val valuerdd = spark.sparkContext.textFile("data/etl_access.log").map(
x => {
val splits = x.split("\t")
//注意 rowrdd (是否缺失字段)
Row(Some(splits(0)).getOrElse("www"), Some(splits(1)).getOrElse("111.111.111.111"))
}
)
val schema = StructType(Array(
StructField("domain",StringType),
StructField("ip",StringType)
))
//自定義函數Udf 注意 導包
val getcity = udf(Ip2Util.getAaddressByIp(_:String))
//建立df
val df = spark.createDataFrame(valuerdd,schema)
df.select($"domain",getcity($"ip")).show()
spark.stop()
register()的使用
val spark = SparkSession.builder()
.master("local")
.appName("SparkSessionApp")
.getOrCreate()
import spark.implicits._
val udf = spark.sparkContext.textFile("data/udf.txt")
.map(_.split("\t"))
.map(x => FootballTeam(x(0), x(1)))
udf.toDF().createOrReplaceTempView("teams")
/**
* step1: 定義 注冊
* step2: 使用
*/
spark.udf.register("teams_length",(input:String)=>{
input.split(",").length
})
spark.sql("select name,teams,teams_length(teams) from teams").show()
spark.stop()