天天看點

SparkSQL實戰8——綜合實戰完成日志分析4

需求、按流量統計主站最受歡迎的TopN課程并儲存到MySQL

建立一張表:

create table day_video_traffics_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
);
           

建立一個實體類DayVideoTrafficsStat:

case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)
           

在StatDAO中添加方法:

/**
      * 批量儲存DayCityVideoAccessStat到資料庫
      */
    def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

        var connection: Connection = null
        var pstmt: PreparedStatement = null

        try {
            connection = MySQLUtils.getConnection()

            connection.setAutoCommit(false) //設定手動送出

            val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
            pstmt = connection.prepareStatement(sql)

            for (ele <- list) {
                pstmt.setString(1, ele.day)
                pstmt.setLong(2, ele.cmsId)
                pstmt.setString(3, ele.city)
                pstmt.setLong(4, ele.times)
                pstmt.setInt(5, ele.timesRank)
                pstmt.addBatch()
            }
            pstmt.executeBatch() // 執行批量處理
            connection.commit() //手工送出
        } catch {
            case e: Exception => e.printStackTrace()
        } finally {
            MySQLUtils.release(connection, pstmt)
        }
    }
           

在Spark主應用程式中添加一個方法,并且在主函數中調用這個方法:

//按流量統計主站最受歡迎的TopN課程
    def videoTrafficsTopNStat(spark:SparkSession,accessDF:DataFrame):Unit = {
        //使用DataFrame方式進行統計
        import spark.implicits._
        val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
          .groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
          .orderBy($"traffics".desc)
          //.show(false)

        /**
          * 将統計結果寫入到MySQL中
          */
        try {
            cityAccessTopNDF.foreachPartition(partitionOfRecords => {
                val list = new ListBuffer[DayVideoTrafficsStat]

                partitionOfRecords.foreach(info => {
                    val day = info.getAs[String]("day")
                    val cmsId = info.getAs[Long]("cmsId")
                    val traffics = info.getAs[Long]("traffics")
                    list.append(DayVideoTrafficsStat(day, cmsId,traffics))
                })

                StatDAO.insertDayVideoTrafficsAccessTopN(list)
            })
        } catch {
            case e:Exception => e.printStackTrace()
        }
    }
           

運作程式,觀察資料庫插入結果:

SparkSQL實戰8——綜合實戰完成日志分析4

代碼重構:删除表中指定日期的,已有的資料

在DAO層新增一個方法deleteData,用來删除指定日期的資料

//删除表中指定日期的,已有的資料
    def deleteData(day:String):Unit = {
        
        val tables = Array("day_video_access_topn_stat",
        "day_video_city_access_topn_stat",
        "day_video_traffics_topn_stat")
        
        var connection:Connection = null
        var pstmt:PreparedStatement = null
        
        try{
            connection = MySQLUtils.getConnection()
            for (table <- tables){
                val deleteSQL = s"delete from $table where day = ?"
                pstmt = connection.prepareStatement(deleteSQL)
                pstmt.setString(1,day)
                pstmt.executeUpdate()
            }
        }catch {
            case e : Exception => e.printStackTrace()
        }finally {
            MySQLUtils.release(connection,pstmt)
        }
    }
           

繼續閱讀