SparkSql可以方便地使用sql来处理数据,实际中经常会大量使用。在处理复杂逻辑时,为了避免写出又长又难以理解的SQL,可以实现自定义函数,再将其注册后,供sql调用。不仅减少sql的复杂度,还增加了代码的重用。下面就实现一个例子来说明,目的是以15分钟为统计周期,统计订单量。
import java.util.Calendar
import java.text.SimpleDateFormat
//时间加减函数
def addSeconds(strDateTime:String,seconds:Int):String={
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = simpleDateFormat .parse(strDateTime)
val c = Calendar.getInstance()
c.setTime(date)
c.add(Calendar.SECOND,seconds)
simpleDateFormat .format(c.getTime())
}
//时间转换函数,如将2016-10-10 00:08:08 转换到2016-10-10 00:15:00
def endTime(date:String,minute:Int,dur:Int):String={
dur match{
case 15 =>
minute match{
case x if (x >=0 && x <15) => date.substring(0,14) + "15:00"
case x if (x >=15 && x <30) => date.substring(0,14) + "30:00"
case x if (x >=30 && x <45) => date.substring(0,14) + "45:00"
case x if (x >=45) => addSeconds(date,3600).substring(0,14) + "00:00"
}
}
}
import org.apache.spark._
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new SqlContext(sc)
//注册自定义函数, _默认传参
sqlContext .udf.register("endTime",endTime _)
val querySQL = """select count(t.order_id) totalNum,t,newtime
from (
select order_id,time,endTime(time,minute(time),15) newtime
from orders
order by time
) t
group by t.order_id,t,newtime
order by t.order_id,t,newtime
"""
val results = sqlContext .sql(querySQL ).show()
//如有疑问或不当之处,欢迎交流!