天天看点

Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗

代码及测试文件下载

1 离线数据处理流程

1)数据采集

  • Flume:web日志写入到HDFS

2)数据清洗

  • 脏数据处理
  • 可以使用Spark、Hive、MapReduce
  • 清洗完之后数据可以放在HDFS

3)数据处理

  • 按照需求进行业务统计和分析
  • 使用Spark、Hive、MapReduce或者其他分布式计算框架

4)处理结果入库

  • 结果存放在RDBMS、NoSQL

5)数据可视化

  • 通过图形化展示的方式展现出来:饼图、折线图等
  • 使用ECharts、HUE等

【流程图】

Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗

2 项目需求

1)统计最受欢迎的课程的TopN访问次数

2)按地市统计最受欢迎的TopN课程

3)按流量统计最受欢迎的TopN课程

3 日志内容构成

- - [/Nov/::: +] "GET /course/ajaxlastmediarecom?cid=563 HTTP/1.1"   "www.imooc.com" "http://www.imooc.com/video/10686" - "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" :   
 - - [/Nov/::: +] "HEAD / HTTP/1.1"   "117.121.101.40" "-" - "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" "-" - - - 
 - - [/Nov/::: +] "POST /course/ajaxmediauser HTTP/1.1"   "www.imooc.com" "http://www.imooc.com/code/1431" mid=&time= "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0" "-" :   
 - - [/Nov/::: +] "POST /course/ajaxmediauser/ HTTP/1.1"   "www.imooc.com" "http://www.imooc.com/video/678" mid=&time=&learn_time= "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" :   
           

4 数据清洗

1)第一次清洗:格式化原始日志数据

SparkStatFormatJob.scala

package log

import org.apache.spark.sql.SparkSession

/**
  *  第一步数据清洗:抽取需要的指定列的数据
  */
object SparkStatFormatJob {

  def main(args: Array[String]): Unit = {

    val spark=SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()

    // 获取文件
    val access=spark.sparkContext.textFile("/Users/Mac/testdata/10000_access.log")

    access.map(line=>{
      val splits=line.split(" ")
      val ip=splits()

      /**
        * 获取日志中完整的访问时间
        * 并转换日期格式
        */
      val time=splits()+" "+splits()
      val url=splits().replaceAll("\"","")
      val traffic=splits()

      DateUtils.parse(time)+"\t"+url+"\t"+traffic+"\t"+ip
    }).saveAsTextFile("/Users/Mac/testdata/spark-output/")

    spark.stop()
  }

}
           
Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗

DateUtils.scala

package log

import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

/**
  * 日期时间解析工具类
  */
object DateUtils {

  // 输入文件日期时间格式
  val SOURCE_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)

  // 目标日期格式
  val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  /**
    * 获取时间
    */
  def parse(time: String) = {
    TARGET_TIME_FORMAT.format(new Date(getTime(time)))
  }

  /**
    * 获取输入日志时间
    */

  def getTime(time: String) = {
    try {
      SOURCE_TIME_FORMAT.parse(time.substring(time.indexOf("[") + , time.lastIndexOf("]"))).getTime
    } catch {
      case e: Exception => {
        l
      }
    }

  }

  def main(args: Array[String]): Unit = {
    println(parse("[10/Nov/2016:00:01:02 +0800]"))
  }

}

           

清洗完的部分数据:

Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗

2)二次清洗:日志解析

输入:访问时间、访问URL、耗费的流量、访问的IP地址信息

输出:URL,课程类型,课程编号,流量,IP,城市信息,访问时间,天

需要使用第三方包:ipdatabase

// 安装到maven仓库
mvn clean install -DskipTests
           

修改pom.xml,添加依赖

<!-- 解析IP地址 -->
    <dependency>
      <groupId>com.ggstar</groupId>
      <artifactId>ipdatabase</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>

    <dependency>
      <groupId>org.apache.poi</groupId>
      <artifactId>poi-ooxml</artifactId>
      <version>3.14</version>
    </dependency>

    <dependency>
      <groupId>org.apache.poi</groupId>
      <artifactId>poi</artifactId>
      <version>3.14</version>
    </dependency>
           

IpUtils.scala

package log

import com.ggstar.util.ip.IpHelper

/**
  * ip解析工具类
  */
object IpUtils {

  def getCity(ip:String)={
    IpHelper.findRegionByIp(ip)
  }

  def main(args: Array[String]): Unit = {
    println(getCity("218.75.35.226"))
  }
}
           

AccessConvertUtil.scala

package log

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

/**
  * 日志数据转换(输入==>输出)工具类
  */
object AccessConvertUtil {

  // 定义输出字段
  val struct = StructType(
    Array(
      StructField("url", StringType),
      StructField("classType", StringType),
      StructField("classId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  )

  /**
    * 根据输入的每一行信息转换成输出的格式
    */
  def parseLog(log: String) = {

    try {
      val splits = log.split("\t")

      val url = splits()
      val traffic = splits().toLong
      val ip = splits()

      val domain = "http://www.imooc.com/"

      val className = url.substring(url.indexOf(domain) + domain.length)

      val classTypeId = className.split("/")

      var classType = ""
      var classId = l

      if (classTypeId.length > ) {
        classType = classTypeId()
        if (classId.isInstanceOf[Long]){
          classId = classTypeId().toLong
        }else{
          classId=l
        }

      }

      val city = IpUtils.getCity(ip)
      val time = splits()
      val day = time.substring(, ).replaceAll("-", "")

      // row要和struct对应上
      Row(url, classType, classId, traffic, ip, city, time, day)

    } catch {
      case e: Exception => Row("","",l,l,"","","","")
    }
  }

}
           

SparkStatCleanJob.scala

package log


import org.apache.spark.sql.{SaveMode, SparkSession}


/**
  * 使用spark完成数据清洗
  */
object SparkStatCleanJob {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()

    val accessRDD = spark.sparkContext.textFile("/Users/Mac/testdata/access.log")

    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
      AccessConvertUtil.struct)

//    accessDF.printSchema()
//    accessDF.show(false)

    // coalesce:设置输出文件的个数,默认个
    accessDF.coalesce().write.format("parquet").mode(SaveMode.Overwrite)
      .partitionBy("day").save("/Users/Mac/testdata/log_clean/")
  }
}
           
Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗
Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗

继续阅读