目录
- 1、离线式行为分析
- 2、交互式行为分析
- 3、DataSet 案例实战
1、离线式行为分析
每天凌晨对昨天的数据进行批量处理和分析,统计出各种指标和报表,放入MySQL等关系型数据库中。第二天就可以看到昨天以及昨天以前的数据分析结果。
2、交互式行为分析
用户在需要的时候,选择对应的查询和分析条件,然后由我们的系统立即运行一个大数据处理分析作业,在最短的时间内给用户提供他们想要的数据,这是所谓的交互式用户行为分析。
3、DataSe 案例实战
需求:
- 统计指定时间内访问次数最多的十个用户
- 统计指定时间内购买金额最多的十个用户
- 统计最近一个周期比上一个周期访问次数最多的十个用户
- 统计最近一个周期比上一个周期购买金额最多的十个用户
- 统计指定时间范围内头七天访问次数最多的十个用户
- 统计指定时间范围内头七天购买金额最多的十个用户
代码:
package com.study.spark2project
import org.apache.spark.sql.SparkSession
object UserActiveDegreeAnalyze {
case class UserActionLog(logId:Long, userId:Long, actionTime:String, actionType:Long, purchaseMoney:Double)
case class UserActionLogVO(logId:Long, userId:Long, actionValue:Long)
case class UserActionLogWithPurchaseMoneyVO(logId:Long, userId:Long, purchaseMoney:Double)
def main(args:Array[String]): Unit ={
val startDate = "2016-09-01";
val endDate = "2016-11-01";
val spark = SparkSession
.builder()
.appName("UserActiveDegreeAnalyze")
.master("local")
.config("spark.sql.warehouse.dir", "G:\\data\\spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val userBaseInfo = spark.read.json("G:\\data\\user_base_info.json")
val userActionLog = spark.read.json("G:\\data\\user_action_log.json")
//统计指定时间内访问次数最多的十个用户
userActionLog
.filter("actionTime >= '"+ startDate +"' and actionTime <= '"+ endDate+"' and actionType = 0")
.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(count(userActionLog("logId")).alias("actionCount"))
.sort($"actionCount".desc)
.limit(10)
.show()
//统计指定时间内购买金额最多的十个用户
userActionLog
.filter("actionTime >= '"+ startDate +"' and actionTime <= '"+ endDate+"' and actionType = 1")
.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(round(sum(userActionLog("purchaseMoney")),2).alias("totalPurchaseMoney"))
.sort($"totalPurchaseMoney".desc)
.limit(10)
.show()
//统计最近一个周期比上一个周期访问次数最多的十个用户
val userActionLogInFirstPeriod = userActionLog.as[UserActionLog]
.filter("actionTime >= '2016-10-01' and actionTime <= '2016-10-31' and actionType = 0")
.map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, 1)}
val userActionLogInSecondPeriod = userActionLog.as[UserActionLog]
.filter("actionTime >= '2016-01-01' and actionTime <= '2016-09-30' and actionType = 0")
.map{ userActionLogEntry => UserActionLogVO(userActionLogEntry.logId, userActionLogEntry.userId, -1)}
val userActionLogDS = userActionLogInFirstPeriod.union(userActionLogInSecondPeriod)
userActionLogDS
.join(userBaseInfo, userActionLogDS("userId") === userBaseInfo("userId"))
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(sum(userActionLogDS("actionValue")).alias("actionIncr"))
.sort($"actionIncr".desc)
.limit(10)
.show()
//统计最近一个周期比上一个周期购买金额最多的十个用户
val userActionLogWithPurchaseMoneyInFirstPeriod = userActionLog.as[UserActionLog]
.filter("actionTime >= '2016-10-01' and actionTime <= '2016-10-31' and actionType = 1")
.map{userActionLogEntry => UserActionLogWithPurchaseMoneyVO(userActionLogEntry.logId,
userActionLogEntry.userId, userActionLogEntry.purchaseMoney)}
val userActionLogWithPurchaseMoneyInSecondPeriod = userActionLog.as[UserActionLog]
.filter("actionTime >= '2016-09-01' and actionTime <= '2016-09-31' and actionType = 1")
.map{userActionLogEntry => UserActionLogWithPurchaseMoneyVO(userActionLogEntry.logId,
userActionLogEntry.userId, -userActionLogEntry.purchaseMoney)}
val userActionLogWithPurchaseMoneyDS = userActionLogWithPurchaseMoneyInSecondPeriod
.union(userActionLogWithPurchaseMoneyInFirstPeriod)
userActionLogWithPurchaseMoneyDS
.join(userBaseInfo, userBaseInfo("userId") === userActionLogWithPurchaseMoneyDS("userId"))
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(round(sum(userActionLogWithPurchaseMoneyDS("purchaseMoney")),2).alias("totalMoney"))
.sort($"totalMoney".desc)
.limit(10)
.show()
//统计指定时间范围内头七天访问次数最多的十个用户
userActionLog
.join(userBaseInfo, userBaseInfo("userId") === userActionLog("userId"))
.filter(userBaseInfo("registTime") >= "2016-10-01"
&& userBaseInfo("registTime") <= "2016-10-31"
&& userActionLog("actionTime") >= userBaseInfo("registTime")
&& userActionLog("actionTime") <= date_add(userBaseInfo("registTime"), 7)
&& userActionLog("actionType") === 0)
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(count(userActionLog("logId")).alias("logCount"))
.sort($"logCount".desc)
.limit(10)
.show()
//统计指定时间范围内头七天购买金额最多的十个用户
userActionLog
.join(userBaseInfo, userBaseInfo("userId") === userActionLog("userId"))
.filter(userBaseInfo("registTime") >= "2016-10-01"
&& userBaseInfo("registTime") <= "2016-10-31"
&& userActionLog("actionTime") >= userBaseInfo("registTime")
&& userActionLog("actionTime") <= date_add(userBaseInfo("registTime"), 7)
&& userActionLog("actionType") === 1)
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
.agg(round(sum(userActionLog("purchaseMoney")),2).alias("totalMoney"))
.sort($"totalMoney".desc)
.limit(10)
.show()
}
}
总结:
- DataSet 的 api 实际上封装了 rdd 相关的算子,其更加丰富了rdd算子功能
- DataFrame 的api 实际上封装了sql 相关的查询语法