-Spark實作熱榜的統計
package com.changhong.laodixiao.HotCalculate;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public final class HotStatistics {
private static Logger logger = Logger.getLogger(HotStatistics.class);
public static void main(String[] args) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設定日期格式
System.out.println(df.format(new Date()));
String EPGInfoDir = "/data/laodi.xiao/zzzHotCal/getEPG/data/";
String EPGInfoStarDir = "/data/laodi.xiao/zzzHotCal/getEPGStar/data/";
String UserActionDataDir = "/data/laodi.xiao/zzzHotCal/getUserAction/data/";
String OutputDir = "/data/laodi.xiao/zzzHotCal/output/data/";
if(args.length==){
EPGInfoDir = args[];
EPGInfoStarDir = args[];
UserActionDataDir = args[];
OutputDir = args[];
}else {
System.err.println("Usage: $JAVA_HOME/java -jar HotCalcualte.jar EPGInfoDir EPGInfoStarDir UserActionDataDir OutputDir");
}
logger.info("<--------------------------------------------- Start Calculate Star Film List ...... ------------------------------->");
HotStatistics.getStarFilmListUsingSpark(EPGInfoStarDir, OutputDir);
logger.info("<--------------------------------------------- Star Film List Calculated ! ------------------------------->");
logger.info("<--------------------------------------------- Start Calculate Best PlayMedia Lists for One Day ...... ------------------------------->");
HotStatistics.getHotStatisticsForOneDayUsingSpark(EPGInfoDir, UserActionDataDir, OutputDir);
logger.info("<--------------------------------------------- Best PlayMedia Lists for One Day Calculated ! ------------------------------->");
System.out.println(df.format(new Date()));
}
public static void getStarFilmListUsingSpark(String ePGInfoStarDir, String outputDir) {
logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
JavaSparkContext sc = new JavaSparkContext(conf);
logger.info("SparkContext initialized !");
logger.info("<--------------------------------------------- Step Two: Reading data from " +ePGInfoStarDir+ " ...... ------------------------------->");
// EPGInfoStar: star,cid,weight
JavaRDD<String> EPGInfoStarRDD = sc.textFile(ePGInfoStarDir+"EPGStarInfo.txt");
JavaPairRDD<String, Iterable<String>> starFilmList = EPGInfoStarRDD.distinct()
.mapToPair( //JavaRDD<String>轉化為JavaPairRDD<Double,String> (weight, "star,cid,weight")
new PairFunction<String, Double, String>() {
private static final long serialVersionUID = L;
public Tuple2<Double, String> call(String arg0){
return new Tuple2<Double, String>(Double.parseDouble(arg0.split(",")[]), arg0);
}
}
)
.sortByKey(false) //按照權重排序
.mapToPair(
new PairFunction<Tuple2<Double,String>, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(Tuple2<Double, String> arg0){
return new Tuple2<String, String>(arg0._2().split(",")[], arg0._2().split(",")[]+","+arg0._2().split(",")[]);
}
}
)
.groupByKey()
.persist(StorageLevel.MEMORY_ONLY());
//輸出統計結果
starFilmList.repartition().saveAsTextFile(outputDir+"starFilmList/");
sc.close();
}
public static void getHotStatisticsForOneDayUsingSpark(String ePGInfoDir, String userActionDataDir, String outputDir) {
logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
JavaSparkContext sc = new JavaSparkContext(conf);
logger.info("SparkContext initialized !");
logger.info("<--------------------------------------------- Step Two: Reading data from ./data/* ...... ------------------------------->");
// EPGInfo: cid, player, categorys_name, model, vip
JavaRDD<String> EPGInfoRDD = sc.textFile(ePGInfoDir + "EPGInfo.txt");
// UserInfoTans: cid, mac, p_log_date
JavaRDD<String> UserInfoRDD = sc.textFile(userActionDataDir + "UserInfo01Day.txt");
logger.info("Data prepared !");
EPGInfoRDD.persist(StorageLevel.MEMORY_ONLY());
UserInfoRDD.persist(StorageLevel.MEMORY_ONLY());
//将UserInfo按照cid劃分為key-value PairRDD
PairFunction<String, String, String> USerKeyDataFunction = new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String x){
String[] tmp = x.split(",", -);
return new Tuple2<String, String>(tmp[], tmp[]+","+tmp[]);
}
};
JavaPairRDD<String, String> userInfoPairRDD = UserInfoRDD
.filter(
new Function<String, Boolean>() {
private static final long serialVersionUID = L;
public Boolean call(String arg0)throws Exception {
return (!arg0.contains("GC") && arg0.length()>);
}
}
)
.mapToPair(USerKeyDataFunction)
.distinct();
logger.info("<--------------------------------------------- Step Three: Get the statistics ...... ------------------------------->");
logger.info("\t\t<------------------------------------- SubStep Three-One: Get the 'model Best Visit List' statistics ...... ------------------------------->");
// EPGInfo: cid, player, categorys_name, model, vip EPGInfoRDD
// UserInfoTans: cid, mac, p_log_date userInfoPairRDD
//tv的EPG資料
JavaRDD<String> tvEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = L;
public Boolean call(String arg0) throws Exception {
return arg0.contains("tv");
}
});
//movie的EPG資料
JavaRDD<String> movieEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = L;
public Boolean call(String arg0) throws Exception {
return arg0.contains("movie");
}
});
JavaPairRDD<String, Iterable<String>> tvHotRdd = userInfoPairRDD //UserInfoTans: cid, mac, p_log_date
.leftOuterJoin( //擷取 [cid, (mac, p_log_date, categorys_name)]
tvEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String arg0)throws Exception {
String[] tmp = arg0.split(",", -);
return new Tuple2<String, String>(tmp[], tmp[]);
}
}
)
)
.mapToPair( //擷取(cid,categorys_name,1)
new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
private static final long serialVersionUID = L;
public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
try{
return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), );
}catch(Exception e){
return new Tuple2<String, Integer>("未知,未知", );
}
}
}
)
.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Integer arg0, Integer arg1)throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
return new Tuple2<Integer, String>(arg0._2, arg0._1);
}
}
)
.sortByKey(false) //(n, cid,categorys_name)
.mapToPair(
new PairFunction<Tuple2<Integer,String>, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
String[] tmp = arg0._2.split(",", -);
return new Tuple2<String, String>(tmp[], tmp[]+","+arg0._1.toString());
}
}
)
.groupByKey()
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 電視劇熱榜 ------------------------------------------>");
tvHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/modelHot/tv");
JavaPairRDD<String, Iterable<String>> movieHotRdd = userInfoPairRDD //UserInfoTans: cid, mac, p_log_date
.leftOuterJoin( //擷取 [cid, (mac, p_log_date, categorys_name)]
movieEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String arg0)throws Exception {
String[] tmp = arg0.split(",", -);
return new Tuple2<String, String>(tmp[], tmp[]);
}
}
)
)
.mapToPair( //擷取(cid,categorys_name,1)
new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
private static final long serialVersionUID = L;
public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
try{
return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), );
}catch(Exception e){
return new Tuple2<String, Integer>("未知,未知", );
}
}
}
)
.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Integer arg0, Integer arg1)throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
return new Tuple2<Integer, String>(arg0._2, arg0._1);
}
}
)
.sortByKey(false) //(n, cid,categorys_name)
.mapToPair(
new PairFunction<Tuple2<Integer,String>, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
String[] tmp = arg0._2.split(",", -);
return new Tuple2<String, String>(tmp[], tmp[]+","+arg0._1.toString());
}
}
)
.groupByKey()
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------- 電影熱榜 ------------------------------------------->");
movieHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/modelHot/movie");
logger.info("\t\t<------------------------------------- SubStep Three-Two: Get the 'VIP Best Visit List' statistics ...... ------------------------------->");
//recordVipPairRDD: cid,(mac, p_log_date, vip)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordVipPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
}
}
)
.distinct()
);
// cid,(mac, p_log_date, vip)
JavaPairRDD<Integer, String> vipHotRdd = recordVipPairRDD
.mapValues( //擷取JavaPairRDD<String,String> (cid,vip)
new Function<Tuple2<String, Optional<String>>, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Tuple2<String, Optional<String>> value) {
try {
return Integer.parseInt(value._2().get());
} catch (Exception e) {
return Integer.parseInt("-1");
}
}
}
)
.filter( //擷取VIP影片
new Function<Tuple2<String, Integer>, Boolean>() {
private static final long serialVersionUID = L;
public Boolean call(Tuple2<String, Integer> arg0) {
return (arg0._2()==);
}
}
)
.mapToPair( //将JavaPairRDD<String, Integer> (cid, vip) 轉換為JavaPairRDD<String, Integer> (cid, 1)
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
private static final long serialVersionUID = L;
public Tuple2<String, Integer> call(Tuple2<String, Integer> arg0) {
return new Tuple2<String, Integer>(arg0._1(), );
}
}
)
.reduceByKey( //JavaRDD<String> (cid, 1)按鍵累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = L;
public Tuple2<Integer,String> call(Tuple2<String,Integer> modelHoTuple2){
return new Tuple2<Integer,String>(modelHoTuple2._2(),modelHoTuple2._1());
}
}
)
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ VIP熱榜 ------------------------------------------>");
vipHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/vipHot");
// ListIterator<Tuple2<Integer, String>> vipHotRddRows = vipHotRdd.collect().listIterator();
// while(vipHotRddRows.hasNext()){
// Tuple2<Integer, String> dInteger = vipHotRddRows.next();
// System.out.println(dInteger);
// }
logger.info("\t\t<------------------------------------- SubStep Three-Three: Get the 'player Best Visit List' statistics ...... ------------------------------->");
//EPGInfo: cid, player, categorys_name, model, vip
//UserInfoTans: cid, mac, p_log_date
//recordPlayerPairRDD: cid,(mac, p_log_date, player)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordPlayerPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
}
}
)
.distinct()
);
// cid,(mac, p_log_date, player)
JavaPairRDD<String, Integer> playerHotRdd = recordPlayerPairRDD
.mapValues( //擷取JavaPairRDD<String,String> (cid,player)
new Function<Tuple2<String, Optional<String>>, String>() {
private static final long serialVersionUID = L;
public String call(Tuple2<String, Optional<String>> value) {
try {
return value._2().get();
} catch (Exception e) {
return "未知";
}
}
}
)
.map( //将JavaPairRDD轉換為JavaRDD<String> (player, cid)
new Function<Tuple2<String,String>,String>() {
private static final long serialVersionUID = L;
public String call(Tuple2<String, String> arg0) throws Exception {
return arg0._2()+ "," +arg0._1();
}
}
)
.mapToPair( //将JavaRDD<String>轉換為JavaRDD<String> ((player, cid), 1)
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = L;
public Tuple2<String, Integer> call(String x){
return new Tuple2<String, Integer>(x, );
}
}
)
.reduceByKey( //JavaRDD<String> ((player, cid), 1)按鍵(player, cid)累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.persist(StorageLevel.MEMORY_ONLY());
//騰訊熱榜
JavaPairRDD<Integer, String> tencentHotRdd = playerHotRdd
.filter(
new Function<Tuple2<String,Integer>, Boolean>() {
private static final long serialVersionUID = L;
public Boolean call(Tuple2<String,Integer> playerHot) throws Exception {
return playerHot._1().contains("tencent");
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = L;
public Tuple2<Integer,String> call(Tuple2<String,Integer> playerHoTuple2){
return new Tuple2<Integer,String>(playerHoTuple2._2(),playerHoTuple2._1().split(",", -)[]);
}
}
)
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 騰訊播放源熱榜 ------------------------------------------>");
tencentHotRdd.repartition().saveAsTextFile(outputDir + "oneDay/playerHot/tencent");
// ListIterator<Tuple2<Integer, String>> tencentHotRddRows = tencentHotRdd.collect().listIterator();
// while(tencentHotRddRows.hasNext()){
// Tuple2<Integer, String> dInteger = tencentHotRddRows.next();
// System.out.println(dInteger);
// }
logger.info("\t\t<------------------------------------- SubStep Three-Four: Get the 'tags Best Visit List' statistics ...... ------------------------------->");
//EPGInfo: cid, player, categorys_name, model, vip
//UserInfoTans: cid, mac, p_log_date
//recordTagsPairRDD: cid,(mac, p_log_date, tag)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordTagsPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -)[], x.split(",", -)[]);
}
}
)
.distinct()
);
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> tagHotPairRDD = recordTagsPairRDD
.mapValues( //擷取JavaPairRDD<String,String> (cid,tag)
new Function<Tuple2<String, Optional<String>>, String>() {
private static final long serialVersionUID = L;
public String call(Tuple2<String, Optional<String>> value) {
try {
return value._2().get();
} catch (Exception e) {
return "未知";
}
}
}
)
.map( //将JavaPairRDD轉換為JavaRDD<String> (tag, cid)
new Function<Tuple2<String,String>,String>() {
private static final long serialVersionUID = L;
public String call(Tuple2<String, String> arg0) throws Exception {
return arg0._2()+ "," +arg0._1();
}
}
)
.mapToPair( //将JavaRDD<String>轉換為JavaRDD<String> ((tag, cid), 1)
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = L;
public Tuple2<String, Integer> call(String x){
return new Tuple2<String, Integer>(x, );
}
}
)
.reduceByKey( //JavaRDD<String> ((tag, cid), 1)按鍵(tag, cid)累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.mapToPair( 将JavaRDD<Tuple2<String,String>,Integer>轉換為JavaRDD<Integer,Tuple2<String,String>> (1, (cid,tag))
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0){
return new Tuple2<Integer,String>(arg0._2(), arg0._1());
}
}
)
.sortByKey(false)
.mapToPair( //将JavaRDD<Tuple2<String,String>,Integer>轉換為JavaRDD<String,Tuple2<String,Integer>> (tag, (cid,1))
new PairFunction<Tuple2<Integer,String>, String, Tuple2<String,Integer>>() {
private static final long serialVersionUID = L;
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Integer, String> arg0){
return new Tuple2<String, Tuple2<String, Integer>>
(arg0._2().split(",")[], new Tuple2<String, Integer>(arg0._2.split(",")[], arg0._1()));
}
}
)
.groupByKey() //按照tag進行聚合操作
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 各标簽熱榜 ------------------------------------------>");
tagHotPairRDD.repartition().saveAsTextFile(outputDir + "oneDay/tagsHot/");
sc.close();
}
}