需求、按流量統計主站最受歡迎的TopN課程并儲存到MySQL
建立一張表:
create table day_video_traffics_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
);
建立一個實體類DayVideoTrafficsStat:
case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)
在StatDAO中添加方法:
/**
* 批量儲存DayCityVideoAccessStat到資料庫
*/
def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //設定手動送出
val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()
}
pstmt.executeBatch() // 執行批量處理
connection.commit() //手工送出
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
在Spark主應用程式中添加一個方法,并且在主函數中調用這個方法:
//按流量統計主站最受歡迎的TopN課程
def videoTrafficsTopNStat(spark:SparkSession,accessDF:DataFrame):Unit = {
//使用DataFrame方式進行統計
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将統計結果寫入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
運作程式,觀察資料庫插入結果:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLzMGRNp3ZU5EeJpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzEzM0UzMxgTMyEzMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
代碼重構:删除表中指定日期的,已有的資料
在DAO層新增一個方法deleteData,用來删除指定日期的資料
//删除表中指定日期的,已有的資料
def deleteData(day:String):Unit = {
val tables = Array("day_video_access_topn_stat",
"day_video_city_access_topn_stat",
"day_video_traffics_topn_stat")
var connection:Connection = null
var pstmt:PreparedStatement = null
try{
connection = MySQLUtils.getConnection()
for (table <- tables){
val deleteSQL = s"delete from $table where day = ?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1,day)
pstmt.executeUpdate()
}
}catch {
case e : Exception => e.printStackTrace()
}finally {
MySQLUtils.release(connection,pstmt)
}
}