文章目录
- 一、项目分析
- 1.数据集结构
- 2.业务场景
- 3.处理过程使用的技术点
- 二、流程分析
- 1.理解数据集
- 2.理解需求和结果集
- 3.反推每一个步骤
- 三、步骤分析
- 1.读取数据集
- 2.数据清洗
- 2.1 数据转换 DataFrame:DataSet[Row] => DataSet[Trip]
- ① 定义Trip样例类
- ② 转换数据
- ⑴ 定义转换对象方法: RDD[Row] => RDD[Trip]
- ⑵ 转换时间类型数据
- ⑶ 转换地点数据
- ⑷ 包装Row处理空值 --- 返回option
- ⑸ 异常处理
- ★ 简单案例解释Either作用
- 2.2 剪除异常数据
- ① 查询异常数据 ---- 统计时间分布直方图
- ◆ 使用UDF
- ▶ 剪除异常数据 ---- 绘制时长直方图
- ▶ 根据直方图的显示,查看数据分布后,剪除反常数据,保留时长 0-3h 的数据集
- 3.行政区信息
- 3.1 求出borough
- ① 读取行政区位置信息
- ▲GeoJSON
- 地理位置的表示
- 行政区范围数据集 .geojson
- 使用步骤
- ▲GeoJSON 解析工具
- ▲JSON4S介绍
- 补充:JSON解析
- 配置Maven依赖
- ▲具体实现
- Step 1: 创建目标类
- Step 2: 将 JSON 字符串解析为目标类对象
- Step 3:解析 GeoJSON ---- 转换 JSON 为 Geometry 对象
- Step 4: 读取数据集, 转换数据
- ② 将 Geometry 数据集按照区域大小排序
- ③ 广播 Geometry 信息, 发给每一个 Executor
- ④ 创建 UDF, 通过经纬度获取行政区信息
- ⑤ 统计行政区信息
- 4.会话统计
- 4.1 需求分析
- ▲具体实现
- Step 1: 过滤没有经纬度的数据、划分会话
- Step 2: 求得时间差
- Step 3: 统计数据
一、项目分析
1.数据集结构
声明:该案例来自学习记录,数据集可能略有差异
主要列:

主要描述:
数据意义:
- 每一行数据 -> 一次行程 ->哪个出租车的:上下车时间,上下车位置
返回顶部
2.业务场景
在网约车出现之前,出行很大一部分要靠出租车和公共交通。所以经常会见到一些情况,比如说从东直门打车告诉师傅要去昌平,可能拒载。这种情况所凸显的是一个出租车调度的难题。所以需要先通过数据来看到问题,后解决问题。
所以要统计出租车利用率也就是有乘客乘坐的时间,和无乘客空跑的时间比例。这是一个理解出租车的重要指标,最响利用率的一个因素就是目的地,比如说去昌平,可能出租车师傅不确定自己是否要空放回来;而去国贸,下车几分钟内,一定能有新的顾客上。
而统计利用率的时候需要用到时间数据和空间据来进行计算,对于时计算来说, SparkSQL提供了很多工具和函数可以使用,而空间计算仍然是一个比较专业的场景,需要使用到第三方库。
我们的需求是在上述的数据集中,根据时间算出等待时间;根据地点落地到某个区,算出某个区的平均等待时间,也就是这个下车地点对于出租车利用率的影响。
返回顶部
3.处理过程使用的技术点
-
1.数据清洗
数据清洗在几乎所有类型的项目中都会遇到处理数据的类型,处理空值等问题
-
2.JSON解析
JSON解析在大部分业务系统的数据分析中都会用到如何读取JSON数据,如何把JSON数据变为可以使用的对象数据
-
3.地理位置信息处理
地理位置信息的处理是一个比较专业的场景在一些粗车网站,或者像滴滴,Uber之类的出行服务上,也经常会处理地理位置信息
-
4.探索性数据分析
从拿到一个数据集明确需求以后,如何逐步了解数据集如何从数据集中探索对应的内容等,是一个数据工程师的基本素质
-
5.会话分析
会话分析用于识别同一个用户的多个操作之间的关联,是分析系统常见的分析模式在电商和搜索引擎中非常常见
返回顶部
二、流程分析
1.理解数据集
- 首先要理解数据集,要回答自己一些问题:
-
这个数据集是否以行作为单位?是否是 DataFrame可以处理的?
大部分情况下都是
-
这个数据集每行记录所代表的实体对象是什么?
例如:出租车的载客记录
-
表达这个实体对象的最核心字段是什么?
例如:上下车地点和时间唯一标识一辆车的 License(id)
2.理解需求和结果集
- 第一点,理解需求再动手绝对不会浪费时间;第二点,在数据分析的任务中,如何无法理解需求可能根本无从动手
- 我们的需求是:出租车在某个地点的平均等待客人时间
- 简单来说,结果集中应该有的列地点平均等待时间
3.反推每一个步骤
结果集中,应该有的字段有两个,一个是地点,一个是等待时间
地点如何获知?
其实就是乘客的下车点,但是是一个坐标如何得到其在哪个区?等待时间如何获知?
其实就是上一个乘客下车到下一个乘客上车之间的时间通过这两个时间的差值便可获知。
返回顶部
三、步骤分析
1.读取数据集
- 数居集很大所以我截取了一小部分,大概百分之一左右如果大家感兴趣的话,可以将完整数据集放在集群中,使用集群来计算大数据
// 1.读取数据集
val source = spark.read
.option("header", value = true)
.csv("G:\\Projects\\IdeaProjects\\Spark\\dataset\\train.csv")
source.show() // 展示数据集
source.printSchema() // 打印结构信息
在读取了数据集后,可以发现在结构信息中的上下车时间字段数据类型为字符串类型数据,显然需要进行转换。并且读取的数据中有些列属于无关信息,需要剔除。
返回顶部
2.数据清洗
- 数据集当中的某些列名可能使用起来不方便或者数据集当中某些列的值类型可能不对,或者数据集中有可能存在缺失值,这些都是要清洗的动机。
2.1 数据转换 DataFrame:DataSet[Row] => DataSet[Trip]
通过
DataFramoReader
读取出来的数据集是
DataFrame(DataSet[Row])
,而 Dataframe中保存的是Row对象.但是
后续我们在进行处理的时候可能要使用到一些有类型的转换
,也需
要每一列数据对应自己的数据类型
.所以需要将Row所代表的弱类型对象转为Trip这样的强类型对象,而Trip对象则是一个样例类用于代表一个出租车的行程.
① 定义Trip样例类
// 4.Trip 样例类
case class Trip(
id: String, // 出租车执照
pickUpTime: Long, // 上车时间
dropOffTime: Long, // 下车时间
pickUpX: Double, // 上车地经度
pickUpY: Double, // 上车点纬度
dropOffX: Double, // 下车点经度
dropOffY: Double // 下车点纬度
)
返回顶部
② 转换数据
// DataFrame:DataSet[Row] => DataSet[Trip]
source.rdd.map(。。。)
⑴ 定义转换对象方法: RDD[Row] => RDD[Trip]
// 5.1定义转换对象方法: RDD[Row] => RDD[Trip]
def prase(row: Row): Trip = {
// 创建RichRow对象
val richRow = new RichRow()
// orNull 方法 --- 若前面有值则返回前面的值,若没有值则返回Null;即使没有值时也便于后期对空值的处理
val id = richRow.getAs[String]("id").orNull
val pickUpTime = parseTime(richRow, "pickup_datetime")
val dropOffTime = parseTime(richRow, "dropoff_datetime")
val pickUpX = parseLocation(richRow, "pickup_longitude")
val pickUpY = parseLocation(richRow, "pickup_latitude")
val dropOffX = parseLocation(richRow, "dropoff_longitude")
val dropOffY = parseLocation(richRow, "dropoff_latitude")
// 最终转化返回的Trip对象
Trip(id, pickUpTime, dropOffTime, pickUpX, pickUpY, dropOffX, dropOffY)
}
返回顶部
⑵ 转换时间类型数据
// 5.1.2 转换时间类型数据
def parseTime(row: RichRow, field: String): Long = {
// 1.转换出来时间类型的格式 SimpleDateFormat
val pattern = "yyyy-MM-dd HH:mm:ss"
val formatter = new SimpleDateFormat(pattern,Locale.ENGLISH)
// 2.执行转化,获取Date对象,getTime获取时间戳
val time = row.getAs[String](field)
// Option的map算子只有当传入的函数由内容时才会执行函数
val timeOption = time.map(time => formatter.parse(time).getTime)
// 返回值声明 --- 有值则返回,无值则返回0L
timeOption.getOrElse(0L)
}
返回顶部
⑶ 转换地点数据
// 5.1.3 转换地点数据
def parseLocation(row: RichRow, field: String): Double = {
// 1.获取对应的数据
val location = row.getAs[String](field)
// 2.转换数据
val locationOption = location.map( loc => loc.toDouble)
locationOption.getOrElse(0.0D)
}
返回顶部
⑷ 包装Row处理空值 — 返回option
// 5.1.1 包装Row处理空值 --- 返回option
class RichRow(row: Row) {
// 重写getAs方法
// 指定泛型 --- 得到指定类型的数据
// 指定参数 --- 传入的形参
/**
* 为了返回option提醒外部处理空值,提供处理方式
* 一个方法返回了Option代表这个方法结果有可能为空,使得方法调用处必须处理null的情况
* Option本身提供了一些处理null的方法
* @param field
* @tparam T
* @return
*/
def getAs[T](field: String): Option[T] = {
// 1.判断row.getAs的内容是否为空
if (row.isNullAt(row.fieldIndex(field))) {
// 2.null -> 返回None
None
} else {
// 3.noe null -> 返回 Some
Some(row.getAs[T](field))
}
}
}
返回顶部
⑸ 异常处理
对于返回结果的并列性,Scala中提供了一个类似于其它语言中多返回值的Either。
Either分为两种情况,一个是left,一个是right,左右两个结果代表的意思可以由用户来决定,也就是自定义。
★ 简单案例解释Either作用
/**
* 封装 parse 方法,捕获异常
* .map(safe(prase)) map接收的是一个函数,所以safe封装了prase方法后返回的还应该是一个函数
* @param function 参数为需要被封装的方法,返回值类型为一个Either对象
* @tparam P 泛型P代表参数类型,表示sourceRdd中的每一条数据类型
* @tparam R 泛型R代表返回值类型,表示处理后的数据类型
* @return
*/
def safe[P,R](function:P => R):P => Either[R,(P,Exception)] = {
new Function[P,Either[R,(P,Exception)]] with Serializable{
override def apply(param: P): Either[R, (P, Exception)] = { // 包裹逻辑要在apply方法中重写
try{
Left(function(param))
} catch {
case e: Exception => Right((param,e))
}
}
}
}
}
优化后调用数据转换方法:
通过异常处理后得到的是一个Either类型的对象,包含正常以及异常值
// 对结果进行判断
// 通过以下方式可以过滤出所有异常的row
// val result = transform.filter( e => e.isRight)
// .map( e => e.right.get._1)
// 5.2 将结果对象转为left
val taxi_clean: Dataset[Trip] = transform.map(either => either.left.get).toDS()
利用map算子将结果对象转换为正常结果 — 获取left部分数据集
返回顶部
2.2 剪除异常数据
① 查询异常数据 ---- 统计时间分布直方图
◆ 使用UDF
▶ 剪除异常数据 ---- 绘制时长直方图
- 上车时长 = (dropOffTime: Long下车时间 - pickUpTime: Long上车时间)
- 单位转换 ms 显然不合适,转为 hour
// 6.1 编写程序 --- UDF, 将毫秒转换为小时单位
val hours = (dropOffTime: Long, pickUpTime:Long) => {
// 计算时长
val duration = dropOffTime - pickUpTime
// 转换时长 --- hour,并返回结果
TimeUnit.HOURS.convert(duration,TimeUnit.MILLISECONDS)
}
// 生成UDF函数
val hoursUDF = udf(hours)
// 6.2 进行时长统计 至少需要两列:时长、计数 ---> 分组聚合
taxi_clean.groupBy(hoursUDF('dropOffTime,'pickUpTime) as "duration")
.count()
.sort('duration desc )
.show(100)
▶ 根据直方图的显示,查看数据分布后,剪除反常数据,保留时长 0-3h 的数据集
// 6.3.1 注册UDF函数
spark.udf.register("hours",hours)
// 6.3.2 利用注册的UDF函数剪除数据
val taxi_clean_drop = taxi_clean.where("hours(dropOffTime,pickUpTime) BETWEEN 0 AND 3")
taxi_clean_drop.show()
返回顶部
3.行政区信息
- 由于最终要统计的结果是按照区域作为单位,而不是一个具体的目的地点所以要在数据集中增加列中放置区域信息
- 1.既然是放置行政区名字,应该现有行政区以及其边界的信息
- 2.通过上下车的坐标点,可以判断是否存在于某个行政区中
- 这些判断坐标点是否属于某个区域,这些信息就是专业的领域了
○ 源数据集
返回顶部
3.1 求出borough
① 读取行政区位置信息
▲GeoJSON
地理位置的表示
行政区范围数据集 .geojson
使用步骤
返回顶部
▲GeoJSON 解析工具
- 介绍
-
【SparkSQL 项目】出租乘客行程记录分析 - 使用
-
【SparkSQL 项目】出租乘客行程记录分析
返回顶部
▲JSON4S介绍
一般在 Java 中, 常使用如下三个工具解析 JSON
-
Gson
Google 开源的 JSON 解析工具, 比较人性化, 易于使用, 但是性能不如 Jackson, 也不如 Jackson 有积淀
-
Jackson
Jackson 是功能最完整的 JSON 解析工具, 也是最老牌的 JSON 解析工具, 性能也足够好, 但是 API 在一开始支持的比较少, 用起来稍微有点繁琐
-
FastJson
阿里巴巴的 JSON 开源解析工具, 以快著称, 但是某些方面用起来稍微有点反直觉
补充:JSON解析
【SparkSQL】扩展 ---- JSON解析
配置Maven依赖
<!--地理位置处理库 -->
<!-- https://mvnrepository.com/artifact/com.esri.geometry/esri-geometry-api -->
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>2.2.2</version>
</dependency>
<!--json.json 解析库-->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
<version>3.2.11</version>
</dependency>
返回顶部
▲具体实现
- GeoJSON
-
【SparkSQL 项目】出租乘客行程记录分析 - 目标类
-
【SparkSQL 项目】出租乘客行程记录分析 - 步骤:
- 对照 JSON 中的格式, 创建解析的目标类
- 解析 JSON 数据转为目标类的对象
- 读取数据集, 执行解析
Step 1: 创建目标类
case class FeatureCollection(features:List[Feature])
case class Feature(properties:Map[String,String],geometry: JObject)
返回顶部
Step 2: 将 JSON 字符串解析为目标类对象
object FeatureExtraction{
// 完成具体的解析工作
def parseJson(json:String):FeatureCollection = {
// 1.导入一个format 隐式转换
implicit val formats = Serialization.formats(NoTypeHints)
// 2.Json ->object
val featureCollection = read[FeatureCollection](json)
featureCollection
}
}
返回顶部
Step 3:解析 GeoJSON ---- 转换 JSON 为 Geometry 对象
case class Feature(properties:Map[String,String],geometry: JObject){
def getGeometry:Geometry = {
// Geometry 对象 实现
import org.json4s.jackson.JsonMethods._
val mapGeo = GeometryEngine.geoJsonToGeometry(compact(render(geometry)),0,Geometry.Type.Unknown)
mapGeo.getGeometry}
}
返回顶部
Step 4: 读取数据集, 转换数据
// 7.1 读取数据集
val geoJson = Source.fromFile("dataset/nyc-borough-boundaries-polygon.geojson").mkString
val featureCollection = FeatureExtraction.parseJson(geoJson)
返回顶部
② 将 Geometry 数据集按照区域大小排序
后续需要得到每一个出租车在哪个行政区域,拿到经纬度,遍历feature.行政区范围大的命中率高,在后续筛选的时候减少遍历次数。
// 7.2 排序
val areaSortedFeatures = featureCollection.features.sortBy(
feature => {
(feature.properties("boroughCode"), -feature.getGeometry.calculateArea2D())
}
)
返回顶部
③ 广播 Geometry 信息, 发给每一个 Executor
/ 7.3 广播 --- 减少数据集的拷贝量
val featureBC = spark.sparkContext.broadcast(areaSortedFeatures)
返回顶部
④ 创建 UDF, 通过经纬度获取行政区信息
// 7.4 UDF创建,完成功能
val boroughLookUp = (x: Double, y: Double) => {
// 7.4.1 搜索经纬度所在的行政区
val featureHit = featureBC.value.find(feature => {
// 判断是否包含
GeometryEngine.contains(feature.getGeometry, new Point(x, y), SpatialReference.create(4326))
})
// 7.4.2 转为行政区信息
val borough = featureHit.map(feature => feature.properties("borough")).getOrElse("NA")
borough
}
返回顶部
⑤ 统计行政区信息
// 7.5 统计信息
val boroughUDF = udf(boroughLookUp)
taxi_clean_drop.groupBy(boroughUDF('dropOffX,'dropOffY))
.count()
.show()
返回顶部
4.会话统计
- 数据集中存在很多出租车师傅的数据,所以如何将某个师傅的记录发往一个分区,在这个分区上完成会话分析呢?这也是一个需要理解的点
4.1 需求分析
目标:统计每个行政区的平均等客时间
- 需求可以拆分为如下几个步骤:
- 按照行政区分组
- 在每一个行政区中, 找到同一个出租车司机的先后两次订单, 本质就是再次针对司机的证件号再次分组
- 求出这两次订单的下车时间和上车时间之差, 便是等待客人的时间
- 针对一个行政区, 求得这个时间的平均数
- 问题: 分组效率太低
- 分组的效率相对较低
- 分组是 Shuffle
- 两次分组, 包括后续的计算, 相对比较复杂
- 解决方案: 分区后在分区中排序
- 按照 License(id) 重新分区, 如此一来, 所有相同的司机的数据就会在同一个分区中
- 计算分区中连续两条数据的时间差
-
【SparkSQL 项目】出租乘客行程记录分析
无论是刚才的多次分组, 还是后续的分区, 都是要找到每个司机的会话, 通过会话来完成功能, 也叫做会话分析。
返回顶部
▲具体实现
Step 1: 过滤没有经纬度的数据、划分会话
// 8.1 过滤没有经纬度的数据
// 8.2 按照分区的id、上车时间进行排序
val sessions =taxi_clean_drop.where("dropOffX!=0 and dropOffY!=0 and pickUpX!=0 and pickUpY!=0")
.repartition('id)
.sortWithinPartitions('id,'pickUpTime)
返回顶部
Step 2: 求得时间差
// 8.3 获取时间差
def boroughDuration(t1:Trip,t2:Trip):(String,Long) = {
val borough = boroughLookUp(t1.dropOffX,t1.dropOffY)
val duration = (t2.dropOffTime - t1.pickUpTime)/1000
(borough,duration)
}
// 得到含有时间差、地区信息的数据集
val BoroughDuration = sessions.mapPartitions(trips => {
// 找到长度为2的窗口,每次移动一个单位
val viter = trips.sliding(2)
.filter(_.size == 2) // 过滤调数据不为奇数的
.filter( p => p.head.id == p.last.id) // 判定分区中的出租车id为同一个
viter.map( p => boroughDuration(p.head,p.last))
}).toDF("borough","seconds")
返回顶部
Step 3: 统计数据
// 分组聚合
BoroughDuration.where("seconds > 0")
.groupBy("borough")
.agg(avg('seconds),stddev('seconds))
.show()