代码及测试文件下载
1 离线数据处理流程
1)数据采集
- Flume:web日志写入到HDFS
2)数据清洗
- 脏数据处理
- 可以使用Spark、Hive、MapReduce
- 清洗完之后数据可以放在HDFS
3)数据处理
- 按照需求进行业务统计和分析
- 使用Spark、Hive、MapReduce或者其他分布式计算框架
4)处理结果入库
- 结果存放在RDBMS、NoSQL
5)数据可视化
- 通过图形化展示的方式展现出来:饼图、折线图等
- 使用ECharts、HUE等
【流程图】
2 项目需求
1)统计最受欢迎的课程的TopN访问次数
2)按地市统计最受欢迎的TopN课程
3)按流量统计最受欢迎的TopN课程
3 日志内容构成
- - [/Nov/::: +] "GET /course/ajaxlastmediarecom?cid=563 HTTP/1.1" "www.imooc.com" "http://www.imooc.com/video/10686" - "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" :
- - [/Nov/::: +] "HEAD / HTTP/1.1" "117.121.101.40" "-" - "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" "-" - - -
- - [/Nov/::: +] "POST /course/ajaxmediauser HTTP/1.1" "www.imooc.com" "http://www.imooc.com/code/1431" mid=&time= "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0" "-" :
- - [/Nov/::: +] "POST /course/ajaxmediauser/ HTTP/1.1" "www.imooc.com" "http://www.imooc.com/video/678" mid=&time=&learn_time= "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" :
4 数据清洗
1)第一次清洗:格式化原始日志数据
SparkStatFormatJob.scala
package log
import org.apache.spark.sql.SparkSession
/**
* 第一步数据清洗:抽取需要的指定列的数据
*/
object SparkStatFormatJob {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()
// 获取文件
val access=spark.sparkContext.textFile("/Users/Mac/testdata/10000_access.log")
access.map(line=>{
val splits=line.split(" ")
val ip=splits()
/**
* 获取日志中完整的访问时间
* 并转换日期格式
*/
val time=splits()+" "+splits()
val url=splits().replaceAll("\"","")
val traffic=splits()
DateUtils.parse(time)+"\t"+url+"\t"+traffic+"\t"+ip
}).saveAsTextFile("/Users/Mac/testdata/spark-output/")
spark.stop()
}
}
DateUtils.scala
package log
import java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间解析工具类
*/
object DateUtils {
// 输入文件日期时间格式
val SOURCE_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
// 目标日期格式
val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
/**
* 获取时间
*/
def parse(time: String) = {
TARGET_TIME_FORMAT.format(new Date(getTime(time)))
}
/**
* 获取输入日志时间
*/
def getTime(time: String) = {
try {
SOURCE_TIME_FORMAT.parse(time.substring(time.indexOf("[") + , time.lastIndexOf("]"))).getTime
} catch {
case e: Exception => {
l
}
}
}
def main(args: Array[String]): Unit = {
println(parse("[10/Nov/2016:00:01:02 +0800]"))
}
}
清洗完的部分数据:
2)二次清洗:日志解析
输入:访问时间、访问URL、耗费的流量、访问的IP地址信息
输出:URL,课程类型,课程编号,流量,IP,城市信息,访问时间,天
需要使用第三方包:ipdatabase
// 安装到maven仓库
mvn clean install -DskipTests
修改pom.xml,添加依赖
<!-- 解析IP地址 -->
<dependency>
<groupId>com.ggstar</groupId>
<artifactId>ipdatabase</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.14</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.14</version>
</dependency>
IpUtils.scala
package log
import com.ggstar.util.ip.IpHelper
/**
* ip解析工具类
*/
object IpUtils {
def getCity(ip:String)={
IpHelper.findRegionByIp(ip)
}
def main(args: Array[String]): Unit = {
println(getCity("218.75.35.226"))
}
}
AccessConvertUtil.scala
package log
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
/**
* 日志数据转换(输入==>输出)工具类
*/
object AccessConvertUtil {
// 定义输出字段
val struct = StructType(
Array(
StructField("url", StringType),
StructField("classType", StringType),
StructField("classId", LongType),
StructField("traffic", LongType),
StructField("ip", StringType),
StructField("city", StringType),
StructField("time", StringType),
StructField("day", StringType)
)
)
/**
* 根据输入的每一行信息转换成输出的格式
*/
def parseLog(log: String) = {
try {
val splits = log.split("\t")
val url = splits()
val traffic = splits().toLong
val ip = splits()
val domain = "http://www.imooc.com/"
val className = url.substring(url.indexOf(domain) + domain.length)
val classTypeId = className.split("/")
var classType = ""
var classId = l
if (classTypeId.length > ) {
classType = classTypeId()
if (classId.isInstanceOf[Long]){
classId = classTypeId().toLong
}else{
classId=l
}
}
val city = IpUtils.getCity(ip)
val time = splits()
val day = time.substring(, ).replaceAll("-", "")
// row要和struct对应上
Row(url, classType, classId, traffic, ip, city, time, day)
} catch {
case e: Exception => Row("","",l,l,"","","","")
}
}
}
SparkStatCleanJob.scala
package log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用spark完成数据清洗
*/
object SparkStatCleanJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("/Users/Mac/testdata/access.log")
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
// accessDF.printSchema()
// accessDF.show(false)
// coalesce:设置输出文件的个数,默认个
accessDF.coalesce().write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save("/Users/Mac/testdata/log_clean/")
}
}