天天看點

spark 大型項目實戰(二十二):使用者通路session分析(二十二) --session随機抽取之按時間比例随機抽取算法實作

項目源碼:https://github.com/haha174/spark-session.git

大緻步驟如下:

第一步,計算出每天每小時的session數量,擷取<yyyy-MM-dd_HH,sessionid>格式的RDD
第二步,使用按時間比例随機抽取算法,計算出每天每小時要抽取session的索引,将<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
第三步,總共要抽取100個session,先按照天數,進行平分
第四步,計算每個小時的session數量,占據當天總session數量的比例,直接乘以每天要抽取的數量就可以計算出,目前小時需要抽取的session數量
第五步,生成上面計算出來的數量的随機數獲得相應的session(在下一篇中實作)
           

代碼實作如下

/**
     * 随機抽取session
     * @param sessionid2AggrInfoRDD
     */
    private static void randomExtractSession(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD) {
        // 第一步,計算出每天每小時的session數量,擷取<yyyy-MM-dd_HH,sessionid>格式的RDD
        JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<String,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, String> tuple) throws Exception {
                        String aggrInfo = tuple._2;

                        String startTime = StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_START_TIME);
                        String dateHour = DateUtils.getDateHour(startTime);

                        return new Tuple2<String, String>(dateHour, aggrInfo);
                    }

                });

        /**
         * 思考一下:這裡我們不要着急寫大量的代碼
         *
         * 每天每小時的session數量,然後計算出每天每小時的session抽取索引,周遊每天每小時session
         * 首先抽取出的session的聚合資料,寫入session_random_extract表
         * 是以第一個RDD的value,應該是session聚合資料
         *
         */

        // 得到每天每小時的session數量
        Map<String, Long> countMap = time2sessionidRDD.countByKey();

        // 第二步,使用按時間比例随機抽取算法,計算出每天每小時要抽取session的索引

        // 将<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
        Map<String, Map<String, Long>> dateHourCountMap =
                new HashMap<String, Map<String, Long>>();

        for(Map.Entry<String, Long> countEntry : countMap.entrySet()) {
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];

            long count = Long.valueOf(String.valueOf(countEntry.getValue()));

            Map<String, Long> hourCountMap = dateHourCountMap.get(date);
            if(hourCountMap == null) {
                hourCountMap = new HashMap<String, Long>();
                dateHourCountMap.put(date, hourCountMap);
            }

            hourCountMap.put(hour, count);
        }

        // 開始實作我們的按時間比例随機抽取算法

        // 總共要抽取100個session,先按照天數,進行平分
        int extractNumberPerDay = 100 / dateHourCountMap.size();

        // <date,<hour,(3,5,20,102)>>
        Map<String, Map<String, List<Integer>>> dateHourExtractMap =
                new HashMap<String, Map<String, List<Integer>>>();

        Random random = new Random();

        for(Map.Entry<String, Map<String, Long>> dateHourCountEntry : dateHourCountMap.entrySet()) {
            String date = dateHourCountEntry.getKey();
            Map<String, Long> hourCountMap = dateHourCountEntry.getValue();

            // 計算出這一天的session總數
            long sessionCount = 0L;
            for(long hourCount : hourCountMap.values()) {
                sessionCount += hourCount;
            }

            Map<String, List<Integer>> hourExtractMap = dateHourExtractMap.get(date);
            if(hourExtractMap == null) {
                hourExtractMap = new HashMap<String, List<Integer>>();
                dateHourExtractMap.put(date, hourExtractMap);
            }

            // 周遊每個小時
            for(Map.Entry<String, Long> hourCountEntry : hourCountMap.entrySet()) {
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 計算每個小時的session數量,占據當天總session數量的比例,直接乘以每天要抽取的數量
                // 就可以計算出,目前小時需要抽取的session數量
                int hourExtractNumber = (int)(((double)count / (double)sessionCount)
                        * extractNumberPerDay);
                if(hourExtractNumber > count) {
                    hourExtractNumber = (int) count;
                }

                // 先擷取目前小時的存放随機數的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if(extractIndexList == null) {
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour, extractIndexList);
                }

                // 生成上面計算出來的數量的随機數
                for(int i = 0; i < hourExtractNumber; i++) {
                    int extractIndex = random.nextInt((int) count);
                    while(extractIndexList.contains(extractIndex)) {
                        extractIndex = random.nextInt((int) count);
                    }
                    extractIndexList.add(extractIndex);
                }
            }
        }
    }
     /**
         * 第四步:擷取抽取出來的session的明細資料
         */
        JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
                extractSessionidsRDD.join(sessionid2actionRDD);
        extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
                Row row = tuple._2._2;

                SessionDetail sessionDetail = new SessionDetail();
                sessionDetail.setTaskid(taskid);
                sessionDetail.setUserid(row.getLong(0));
                sessionDetail.setSessionid(row.getString(1));
                sessionDetail.setPageid(row.getLong(2));
                sessionDetail.setActionTime(row.getString(3));
                sessionDetail.setSearchKeyword(row.getString(4));
                sessionDetail.setClickCategoryId(row.getLong(5));
                sessionDetail.setClickProductId(row.getLong(6));
                sessionDetail.setOrderCategoryIds(row.getString(7));
                sessionDetail.setOrderProductIds(row.getString(8));
                sessionDetail.setPayCategoryIds(row.getString(9));
                sessionDetail.setPayProductIds(row.getString(11));

                ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
                sessionDetailDAO.insert(sessionDetail);
            }
        });
           

歡迎關注,更多驚喜等着你

spark 大型項目實戰(二十二):使用者通路session分析(二十二) --session随機抽取之按時間比例随機抽取算法實作

繼續閱讀