天天看点

[Spark应用]-- 自定义spark udf计算单词的长度

Spark Sql的UDF函数非常好用,相比Hive,很简洁

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
/**
 * scala内置函数,用户自定义函数
 * 统计单词的长度
 * 
 */
object UDF {
  def main(args:Array[String]):Unit={
    //1\获取context
    val sc=new SparkContext(new SparkConf().setAppName("UDFTest").setMaster("local[1]"))
    //2\获取sqlContext
    val sqlContext=new SQLContext(sc)
    //3\创建测试数据Array
    val datas=Array("tim","jony","cassis","fang")
    //4\并行化,创建RDD
    val dataRDD=sc.parallelize(datas,4)
    //5\转换为row
    val rows=dataRDD.map { x => Row(x) }
    //6\创建structType
    val structType=StructType(Array(StructField("name",StringType,true)))
    //7\创建Dataframe
    val df=sqlContext.createDataFrame(rows, structType)
    //8\注册表
    df.registerTempTable("t_test")
    //9\注册统计长度的函数
    sqlContext.udf.register("str_len", (str:String)=>str.length())
    //10\sql语句,打印输出
//    sqlContext.sql("select str_len(name) from t_test").collect().foreach { x => println("长度是:"+x) }
    sqlContext.sql("select str_len(name) from t_test").show()
    //关闭资源
    sc.stop()    
  }
}
/**

结果:
+---+
|_c0|
+---+
|  3|
|  4|
|  6|
|  4|
+---+

*/      

以上测试已经通过,可以根据自己需要做修改!如有不足之处,请各位批评指正!

继续阅读