項目源碼:https://github.com/haha174/spark-session.git
大緻步驟如下:
第一步,計算出每天每小時的session數量,擷取<yyyy-MM-dd_HH,sessionid>格式的RDD
第二步,使用按時間比例随機抽取算法,計算出每天每小時要抽取session的索引,将<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
第三步,總共要抽取100個session,先按照天數,進行平分
第四步,計算每個小時的session數量,占據當天總session數量的比例,直接乘以每天要抽取的數量就可以計算出,目前小時需要抽取的session數量
第五步,生成上面計算出來的數量的随機數獲得相應的session(在下一篇中實作)
代碼實作如下
/**
* 随機抽取session
* @param sessionid2AggrInfoRDD
*/
private static void randomExtractSession(
JavaPairRDD<String, String> sessionid2AggrInfoRDD) {
// 第一步,計算出每天每小時的session數量,擷取<yyyy-MM-dd_HH,sessionid>格式的RDD
JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(
new PairFunction<Tuple2<String,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(
Tuple2<String, String> tuple) throws Exception {
String aggrInfo = tuple._2;
String startTime = StringUtils.getFieldFromConcatString(
aggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_START_TIME);
String dateHour = DateUtils.getDateHour(startTime);
return new Tuple2<String, String>(dateHour, aggrInfo);
}
});
/**
* 思考一下:這裡我們不要着急寫大量的代碼
*
* 每天每小時的session數量,然後計算出每天每小時的session抽取索引,周遊每天每小時session
* 首先抽取出的session的聚合資料,寫入session_random_extract表
* 是以第一個RDD的value,應該是session聚合資料
*
*/
// 得到每天每小時的session數量
Map<String, Long> countMap = time2sessionidRDD.countByKey();
// 第二步,使用按時間比例随機抽取算法,計算出每天每小時要抽取session的索引
// 将<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
Map<String, Map<String, Long>> dateHourCountMap =
new HashMap<String, Map<String, Long>>();
for(Map.Entry<String, Long> countEntry : countMap.entrySet()) {
String dateHour = countEntry.getKey();
String date = dateHour.split("_")[0];
String hour = dateHour.split("_")[1];
long count = Long.valueOf(String.valueOf(countEntry.getValue()));
Map<String, Long> hourCountMap = dateHourCountMap.get(date);
if(hourCountMap == null) {
hourCountMap = new HashMap<String, Long>();
dateHourCountMap.put(date, hourCountMap);
}
hourCountMap.put(hour, count);
}
// 開始實作我們的按時間比例随機抽取算法
// 總共要抽取100個session,先按照天數,進行平分
int extractNumberPerDay = 100 / dateHourCountMap.size();
// <date,<hour,(3,5,20,102)>>
Map<String, Map<String, List<Integer>>> dateHourExtractMap =
new HashMap<String, Map<String, List<Integer>>>();
Random random = new Random();
for(Map.Entry<String, Map<String, Long>> dateHourCountEntry : dateHourCountMap.entrySet()) {
String date = dateHourCountEntry.getKey();
Map<String, Long> hourCountMap = dateHourCountEntry.getValue();
// 計算出這一天的session總數
long sessionCount = 0L;
for(long hourCount : hourCountMap.values()) {
sessionCount += hourCount;
}
Map<String, List<Integer>> hourExtractMap = dateHourExtractMap.get(date);
if(hourExtractMap == null) {
hourExtractMap = new HashMap<String, List<Integer>>();
dateHourExtractMap.put(date, hourExtractMap);
}
// 周遊每個小時
for(Map.Entry<String, Long> hourCountEntry : hourCountMap.entrySet()) {
String hour = hourCountEntry.getKey();
long count = hourCountEntry.getValue();
// 計算每個小時的session數量,占據當天總session數量的比例,直接乘以每天要抽取的數量
// 就可以計算出,目前小時需要抽取的session數量
int hourExtractNumber = (int)(((double)count / (double)sessionCount)
* extractNumberPerDay);
if(hourExtractNumber > count) {
hourExtractNumber = (int) count;
}
// 先擷取目前小時的存放随機數的list
List<Integer> extractIndexList = hourExtractMap.get(hour);
if(extractIndexList == null) {
extractIndexList = new ArrayList<Integer>();
hourExtractMap.put(hour, extractIndexList);
}
// 生成上面計算出來的數量的随機數
for(int i = 0; i < hourExtractNumber; i++) {
int extractIndex = random.nextInt((int) count);
while(extractIndexList.contains(extractIndex)) {
extractIndex = random.nextInt((int) count);
}
extractIndexList.add(extractIndex);
}
}
}
}
/**
* 第四步:擷取抽取出來的session的明細資料
*/
JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
extractSessionidsRDD.join(sessionid2actionRDD);
extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
Row row = tuple._2._2;
SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(0));
sessionDetail.setSessionid(row.getString(1));
sessionDetail.setPageid(row.getLong(2));
sessionDetail.setActionTime(row.getString(3));
sessionDetail.setSearchKeyword(row.getString(4));
sessionDetail.setClickCategoryId(row.getLong(5));
sessionDetail.setClickProductId(row.getLong(6));
sessionDetail.setOrderCategoryIds(row.getString(7));
sessionDetail.setOrderProductIds(row.getString(8));
sessionDetail.setPayCategoryIds(row.getString(9));
sessionDetail.setPayProductIds(row.getString(11));
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
sessionDetailDAO.insert(sessionDetail);
}
});
歡迎關注,更多驚喜等着你
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZwpmLxEDOwUTM1EDNwgTMwIDOwUDO0kDN5ETM4MTNxQDOxAjMvwFcvRnL0cTMhhWYo5SZslmZvw1LcpDc0RHaiojIsJye.jpg)