天天看點

Spark統計熱榜(JAVA版)

-Spark實作熱榜的統計

package com.changhong.laodixiao.HotCalculate;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;

import scala.Tuple2;

public final class HotStatistics {
    private static Logger logger = Logger.getLogger(HotStatistics.class);

    public static void main(String[] args) {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設定日期格式
        System.out.println(df.format(new Date()));

        String EPGInfoDir = "/data/laodi.xiao/zzzHotCal/getEPG/data/";
        String EPGInfoStarDir = "/data/laodi.xiao/zzzHotCal/getEPGStar/data/";
        String UserActionDataDir = "/data/laodi.xiao/zzzHotCal/getUserAction/data/"; 
        String OutputDir = "/data/laodi.xiao/zzzHotCal/output/data/";

        if(args.length==){
            EPGInfoDir = args[];
            EPGInfoStarDir = args[];
            UserActionDataDir = args[];
            OutputDir = args[];            
        }else {
            System.err.println("Usage: $JAVA_HOME/java -jar HotCalcualte.jar EPGInfoDir EPGInfoStarDir UserActionDataDir OutputDir");          
        }

        logger.info("<--------------------------------------------- Start Calculate Star Film List ...... ------------------------------->");
        HotStatistics.getStarFilmListUsingSpark(EPGInfoStarDir, OutputDir);
        logger.info("<--------------------------------------------- Star Film List Calculated ! ------------------------------->");

        logger.info("<--------------------------------------------- Start Calculate Best PlayMedia Lists for One Day ...... ------------------------------->");
        HotStatistics.getHotStatisticsForOneDayUsingSpark(EPGInfoDir, UserActionDataDir, OutputDir);
        logger.info("<--------------------------------------------- Best PlayMedia Lists for One Day Calculated ! ------------------------------->");

        System.out.println(df.format(new Date()));

    }


    public static void getStarFilmListUsingSpark(String ePGInfoStarDir, String outputDir) {
        logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
        SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
        JavaSparkContext sc = new JavaSparkContext(conf);
        logger.info("SparkContext initialized !");

        logger.info("<--------------------------------------------- Step Two: Reading data from " +ePGInfoStarDir+ " ...... ------------------------------->");
        // EPGInfoStar: star,cid,weight
        JavaRDD<String> EPGInfoStarRDD = sc.textFile(ePGInfoStarDir+"EPGStarInfo.txt");

        JavaPairRDD<String, Iterable<String>> starFilmList = EPGInfoStarRDD.distinct()
                .mapToPair( //JavaRDD<String>轉化為JavaPairRDD<Double,String> (weight, "star,cid,weight")
                        new PairFunction<String, Double, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<Double, String> call(String arg0){                                
                                return new Tuple2<Double, String>(Double.parseDouble(arg0.split(",")[]), arg0);
                            }
                        }
                )
                .sortByKey(false) //按照權重排序
                .mapToPair(
                        new PairFunction<Tuple2<Double,String>, String, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, String> call(Tuple2<Double, String> arg0){                                
                                return new Tuple2<String, String>(arg0._2().split(",")[], arg0._2().split(",")[]+","+arg0._2().split(",")[]);
                            }                           
                        }
                )
                .groupByKey()               
                .persist(StorageLevel.MEMORY_ONLY());

        //輸出統計結果
        starFilmList.repartition().saveAsTextFile(outputDir+"starFilmList/");

        sc.close();
    }


    public static void getHotStatisticsForOneDayUsingSpark(String ePGInfoDir, String userActionDataDir, String outputDir) {     
        logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
        SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
        JavaSparkContext sc = new JavaSparkContext(conf);
        logger.info("SparkContext initialized !");

        logger.info("<--------------------------------------------- Step Two: Reading data from ./data/* ...... ------------------------------->");
        // EPGInfo: cid, player, categorys_name, model, vip
        JavaRDD<String> EPGInfoRDD = sc.textFile(ePGInfoDir + "EPGInfo.txt");
        // UserInfoTans: cid, mac, p_log_date
        JavaRDD<String> UserInfoRDD = sc.textFile(userActionDataDir + "UserInfo01Day.txt");
        logger.info("Data prepared !");

        EPGInfoRDD.persist(StorageLevel.MEMORY_ONLY());
        UserInfoRDD.persist(StorageLevel.MEMORY_ONLY());


        //将UserInfo按照cid劃分為key-value PairRDD
        PairFunction<String, String, String> USerKeyDataFunction = new PairFunction<String, String, String>() {
            private static final long serialVersionUID = L;
            public Tuple2<String, String> call(String x){       
                String[] tmp = x.split(",", -);
                return new Tuple2<String, String>(tmp[], tmp[]+","+tmp[]);
            }
        };      
        JavaPairRDD<String, String> userInfoPairRDD = UserInfoRDD
                                                        .filter(
                                                            new Function<String, Boolean>() {
                                                                private static final long serialVersionUID = L;
                                                                public Boolean call(String arg0)throws Exception {                                                                  
                                                                    return (!arg0.contains("GC") && arg0.length()>);
                                                                }
                                                            }
                                                        )
                                                        .mapToPair(USerKeyDataFunction)
                                                        .distinct();


        logger.info("<--------------------------------------------- Step Three: Get the statistics ...... ------------------------------->"); 
        logger.info("\t\t<------------------------------------- SubStep Three-One: Get the 'model Best Visit List' statistics ...... ------------------------------->");
        // EPGInfo: cid, player, categorys_name, model, vip  EPGInfoRDD
        // UserInfoTans: cid, mac, p_log_date userInfoPairRDD

        //tv的EPG資料
        JavaRDD<String> tvEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = L;
            public Boolean call(String arg0) throws Exception {
                return arg0.contains("tv");
            }
        });     
        //movie的EPG資料
        JavaRDD<String> movieEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = L;
            public Boolean call(String arg0) throws Exception {
                return arg0.contains("movie");
            }
        });                                 

        JavaPairRDD<String, Iterable<String>> tvHotRdd = userInfoPairRDD //UserInfoTans: cid,   mac, p_log_date
                .leftOuterJoin( //擷取 [cid, (mac, p_log_date, categorys_name)]
                    tvEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
                    .mapToPair(
                        new PairFunction<String, String, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, String> call(String arg0)throws Exception {
                                String[] tmp = arg0.split(",", -);
                                return new Tuple2<String, String>(tmp[], tmp[]);
                            }
                        }
                    )
                )
                .mapToPair( //擷取(cid,categorys_name,1)
                    new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
                            try{
                                return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), );
                            }catch(Exception e){
                                return new Tuple2<String, Integer>("未知,未知", );
                            }   
                        }
                    }
                )
                .reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        private static final long serialVersionUID = L;
                        public Integer call(Integer arg0, Integer arg1)throws Exception {                               
                            return arg0+arg1;
                        }   
                    }
                )
                .mapToPair( 
                    new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
                            return new Tuple2<Integer, String>(arg0._2, arg0._1);
                        }
                    }
                )
                .sortByKey(false) //(n, cid,categorys_name)
                .mapToPair(
                    new PairFunction<Tuple2<Integer,String>, String, String>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
                            String[] tmp = arg0._2.split(",", -);
                            return new Tuple2<String, String>(tmp[], tmp[]+","+arg0._1.toString());
                        }

                    }
                )
                .groupByKey()
                .sortByKey(false)
                .persist(StorageLevel.MEMORY_ONLY());

        System.out.println("<------------------------------------ 電視劇熱榜 ------------------------------------------>");
        tvHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/modelHot/tv");



        JavaPairRDD<String, Iterable<String>> movieHotRdd = userInfoPairRDD //UserInfoTans: cid,   mac, p_log_date
                .leftOuterJoin( //擷取 [cid, (mac, p_log_date, categorys_name)]
                    movieEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
                    .mapToPair(
                        new PairFunction<String, String, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, String> call(String arg0)throws Exception {
                                String[] tmp = arg0.split(",", -);
                                return new Tuple2<String, String>(tmp[], tmp[]);
                            }
                        }
                    )
                )
                .mapToPair( //擷取(cid,categorys_name,1)
                    new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
                            try{
                                return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), );
                            }catch(Exception e){
                                return new Tuple2<String, Integer>("未知,未知", );
                            }   
                        }
                    }
                )
                .reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        private static final long serialVersionUID = L;
                        public Integer call(Integer arg0, Integer arg1)throws Exception {                               
                            return arg0+arg1;
                        }   
                    }
                )
                .mapToPair( 
                    new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
                            return new Tuple2<Integer, String>(arg0._2, arg0._1);
                        }
                    }
                )
                .sortByKey(false) //(n, cid,categorys_name)
                .mapToPair(
                    new PairFunction<Tuple2<Integer,String>, String, String>() {
                        private static final long serialVersionUID = L;
                        public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
                            String[] tmp = arg0._2.split(",", -);
                            return new Tuple2<String, String>(tmp[], tmp[]+","+arg0._1.toString());
                        }

                    }
                )
                .groupByKey()
                .sortByKey(false)
                .persist(StorageLevel.MEMORY_ONLY());
        System.out.println("<------------------------------------- 電影熱榜 ------------------------------------------->");
        movieHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/modelHot/movie");





        logger.info("\t\t<------------------------------------- SubStep Three-Two: Get the 'VIP Best Visit List' statistics ...... ------------------------------->");      
        //recordVipPairRDD: cid,(mac, p_log_date, vip)
                JavaPairRDD<String, Tuple2<String, Optional<String>>> recordVipPairRDD = userInfoPairRDD
                        .leftOuterJoin(
                            EPGInfoRDD
                            .mapToPair(
                                    new PairFunction<String, String, String>() {
                                        private static final long serialVersionUID = L;
                                        public Tuple2<String, String> call(String x){               
                                            return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
                                        }
                                    }
                                )
                            .distinct()
                        );

        // cid,(mac, p_log_date, vip)  
        JavaPairRDD<Integer, String> vipHotRdd = recordVipPairRDD
                .mapValues( //擷取JavaPairRDD<String,String> (cid,vip)
                        new Function<Tuple2<String, Optional<String>>, Integer>() {
                            private static final long serialVersionUID = L;
                            public Integer call(Tuple2<String, Optional<String>> value) {
                                try {
                                    return Integer.parseInt(value._2().get());
                                } catch (Exception e) {
                                    return Integer.parseInt("-1");
                                }
                                }
                            }
                        )
                .filter( //擷取VIP影片
                        new Function<Tuple2<String, Integer>, Boolean>() {
                            private static final long serialVersionUID = L;
                            public Boolean call(Tuple2<String, Integer> arg0) {
                                return (arg0._2()==);
                            }
                        }
                    )
                .mapToPair( //将JavaPairRDD<String, Integer> (cid, vip) 轉換為JavaPairRDD<String, Integer> (cid, 1)
                        new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, Integer> call(Tuple2<String, Integer> arg0) {
                                return new Tuple2<String, Integer>(arg0._1(), );
                            }
                        }
                    )
                .reduceByKey( //JavaRDD<String> (cid, 1)按鍵累加
                        new Function2<Integer, Integer, Integer>() {
                            private static final long serialVersionUID = L;
                            public Integer call(Integer arg0, Integer arg1) throws Exception {
                                return arg0+arg1;
                            }
                        }
                    )
                .mapToPair(
                        new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<Integer,String> call(Tuple2<String,Integer> modelHoTuple2){
                                return new Tuple2<Integer,String>(modelHoTuple2._2(),modelHoTuple2._1());
                            }
                        }
                    )
                .sortByKey(false)
                .persist(StorageLevel.MEMORY_ONLY());


        System.out.println("<------------------------------------ VIP熱榜 ------------------------------------------>");
        vipHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/vipHot");
//      ListIterator<Tuple2<Integer, String>> vipHotRddRows = vipHotRdd.collect().listIterator();
//      while(vipHotRddRows.hasNext()){
//          Tuple2<Integer, String> dInteger = vipHotRddRows.next();
//              System.out.println(dInteger);
//      }



        logger.info("\t\t<------------------------------------- SubStep Three-Three: Get the 'player Best Visit List' statistics ...... ------------------------------->");     
        //EPGInfo: cid, player, categorys_name, model, vip
        //UserInfoTans: cid, mac, p_log_date
        //recordPlayerPairRDD: cid,(mac, p_log_date, player)
                JavaPairRDD<String, Tuple2<String, Optional<String>>> recordPlayerPairRDD = userInfoPairRDD
                        .leftOuterJoin(
                            EPGInfoRDD
                            .mapToPair(
                                    new PairFunction<String, String, String>() {
                                        private static final long serialVersionUID = L;
                                        public Tuple2<String, String> call(String x){               
                                            return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
                                        }
                                    }
                                )
                            .distinct()
                        );

        // cid,(mac, p_log_date, player)  
        JavaPairRDD<String, Integer> playerHotRdd = recordPlayerPairRDD
                .mapValues( //擷取JavaPairRDD<String,String> (cid,player)
                        new Function<Tuple2<String, Optional<String>>, String>() {
                            private static final long serialVersionUID = L;
                            public String call(Tuple2<String, Optional<String>> value) {
                                try {
                                    return value._2().get();
                                } catch (Exception e) {
                                    return "未知";
                                }
                                }
                            }
                        )
                .map( //将JavaPairRDD轉換為JavaRDD<String> (player, cid)
                        new Function<Tuple2<String,String>,String>() {
                            private static final long serialVersionUID = L;
                            public String call(Tuple2<String, String> arg0) throws Exception {
                                return arg0._2()+ "," +arg0._1();
                                }
                            }
                        )
                .mapToPair( //将JavaRDD<String>轉換為JavaRDD<String> ((player, cid), 1)
                        new PairFunction<String, String, Integer>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, Integer> call(String x){              
                                return new Tuple2<String, Integer>(x, );
                            }
                        }
                    )
                .reduceByKey( //JavaRDD<String> ((player, cid), 1)按鍵(player, cid)累加
                        new Function2<Integer, Integer, Integer>() {
                            private static final long serialVersionUID = L;
                            public Integer call(Integer arg0, Integer arg1) throws Exception {
                                return arg0+arg1;
                            }
                        }
                    )
                .persist(StorageLevel.MEMORY_ONLY());



        //騰訊熱榜
        JavaPairRDD<Integer, String> tencentHotRdd = playerHotRdd
                            .filter(
                                    new Function<Tuple2<String,Integer>, Boolean>() {
                                        private static final long serialVersionUID = L;
                                        public Boolean call(Tuple2<String,Integer> playerHot) throws Exception {
                                            return playerHot._1().contains("tencent");
                                        }
                                    }
                                )
                            .mapToPair(
                                    new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                                        private static final long serialVersionUID = L;
                                        public Tuple2<Integer,String> call(Tuple2<String,Integer> playerHoTuple2){
                                            return new Tuple2<Integer,String>(playerHoTuple2._2(),playerHoTuple2._1().split(",", -)[]);
                                        }
                                    }
                                )
                            .sortByKey(false)
                            .persist(StorageLevel.MEMORY_ONLY());

        System.out.println("<------------------------------------ 騰訊播放源熱榜 ------------------------------------------>");
        tencentHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/playerHot/tencent");
//      ListIterator<Tuple2<Integer, String>> tencentHotRddRows = tencentHotRdd.collect().listIterator();
//      while(tencentHotRddRows.hasNext()){
//          Tuple2<Integer, String> dInteger = tencentHotRddRows.next();
//              System.out.println(dInteger);
//      }


        logger.info("\t\t<------------------------------------- SubStep Three-Four: Get the 'tags Best Visit List' statistics ...... ------------------------------->");
        //EPGInfo: cid, player, categorys_name, model, vip
        //UserInfoTans: cid, mac, p_log_date
        //recordTagsPairRDD: cid,(mac, p_log_date, tag)
        JavaPairRDD<String, Tuple2<String, Optional<String>>> recordTagsPairRDD = userInfoPairRDD
                .leftOuterJoin(
                    EPGInfoRDD
                    .mapToPair(
                            new PairFunction<String, String, String>() {
                                private static final long serialVersionUID = L;
                                public Tuple2<String, String> call(String x){               
                                    return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
                                }
                            }
                        )
                    .distinct()
                );      
        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> tagHotPairRDD = recordTagsPairRDD
                .mapValues( //擷取JavaPairRDD<String,String> (cid,tag)
                        new Function<Tuple2<String, Optional<String>>, String>() {
                            private static final long serialVersionUID = L;
                            public String call(Tuple2<String, Optional<String>> value) {
                                try {
                                    return value._2().get();
                                } catch (Exception e) {
                                    return "未知";
                                }                               
                                }
                            }
                        )
                .map( //将JavaPairRDD轉換為JavaRDD<String> (tag, cid)
                        new Function<Tuple2<String,String>,String>() {
                            private static final long serialVersionUID = L;
                            public String call(Tuple2<String, String> arg0) throws Exception {
                                return arg0._2()+ "," +arg0._1();
                                }
                            }
                        )
                .mapToPair( //将JavaRDD<String>轉換為JavaRDD<String> ((tag, cid), 1)
                        new PairFunction<String, String, Integer>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, Integer> call(String x){              
                                return new Tuple2<String, Integer>(x, );
                            }
                        }
                    )
                .reduceByKey( //JavaRDD<String> ((tag, cid), 1)按鍵(tag, cid)累加
                        new Function2<Integer, Integer, Integer>() {
                            private static final long serialVersionUID = L;
                            public Integer call(Integer arg0, Integer arg1) throws Exception {
                                return arg0+arg1;
                            }
                        }
                    )
                .mapToPair( 将JavaRDD<Tuple2<String,String>,Integer>轉換為JavaRDD<Integer,Tuple2<String,String>> (1, (cid,tag))
                        new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0){
                                return new Tuple2<Integer,String>(arg0._2(), arg0._1());
                            }
                        }                           
                    )
                .sortByKey(false)
                .mapToPair( //将JavaRDD<Tuple2<String,String>,Integer>轉換為JavaRDD<String,Tuple2<String,Integer>> (tag, (cid,1))
                        new PairFunction<Tuple2<Integer,String>, String, Tuple2<String,Integer>>() {
                            private static final long serialVersionUID = L;
                            public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Integer, String> arg0){      
                                return new Tuple2<String, Tuple2<String, Integer>>
                                (arg0._2().split(",")[], new Tuple2<String, Integer>(arg0._2.split(",")[], arg0._1()));
                            }

                        }
                    )
                .groupByKey() //按照tag進行聚合操作
                .sortByKey(false)               
                .persist(StorageLevel.MEMORY_ONLY());


        System.out.println("<------------------------------------ 各标簽熱榜 ------------------------------------------>");
        tagHotPairRDD.repartition().saveAsTextFile(outputDir + "oneDay/tagsHot/");


        sc.close();
    }

}