天天看点

Spark实现分组取 top N 示例 —— Java版

主要步骤:

  1. 加载集合生成RDD(textFile)
  2. 对RDD进行转换,将要排序的属性进行分离,生成新的RDD(mapToPair)
  3. 对键进行分组,并在分组内基于第二步分离出的属性进行排序,并取排序结果的 top N (groupByKey,mapToPair)
package rddDemo.examples;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.*;

/**
 * Created by asus on 2018/7/29.
 * 数据样本格式如下:
     anhui hefei 20
     jiangsu nanjing 90
     shandong jinan 100
 * 分组取 top N
 */
public class TopNWithGroupDemoJava {
    public static void main(String[] args ) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("TopNWithGroupDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        final JavaSparkContext sc = new JavaSparkContext(conf) ;
        String filepath = "hdfs://192.168.204.130:9000/log_file/sale_data.txt" ;

        // 加载 HDFS 文件构造RDD,并使用 filter 算子去掉空行
        JavaRDD<String> saleRDD = sc.textFile(filepath , 2).filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {
                return v1.trim().length() > 0 ;
            }
        }) ;
        // 对RDD进行转换,将要排序的属性作为键
        JavaPairRDD<String , Tuple2<Integer , String>> saleWithProvinceRDD = saleRDD.mapToPair(new PairFunction<String, String, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<String, Tuple2<Integer, String>> call(String s) throws Exception {
                if(s.trim().length() != 0) {
                    String[] infos = s.split("\\s+") ;
                    String province = infos[0] ;
                    int saleCount = Integer.parseInt(infos[2]) ;
                    return new Tuple2<>(province , new Tuple2<>(saleCount, s));
                }else {
                    return null ;
                }
            }
        }) ;
//        saleWithProvinceRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, String>>>() {
//            @Override
//            public void call(Tuple2<String, Tuple2<Integer, String>> t) throws Exception {
//                if(t != null) {
//                    System.out.println("province : " + t._1 + " saleCount : " + t._2._1 + " info : " + t._2._2);
//                }
//            }
//        });

        // 进行分组
        JavaPairRDD<String , Iterable<Tuple2<Integer , String>>> saleGroupByProvinceRDD = saleWithProvinceRDD.groupByKey() ;
        // 在分组内进行排序,取分组内的 top N
        JavaPairRDD<String , Iterable<Tuple2<Integer , String>>> saleGroupByProvinceTopNRDD = saleGroupByProvinceRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<Tuple2<Integer,String>>>, String, Iterable<Tuple2<Integer,String>>>() {
            private static final long serialVersionID = 1L ;
            @Override
            public Tuple2<String, Iterable<Tuple2<Integer,String>>> call(Tuple2<String, Iterable<Tuple2<Integer, String>>> t) throws Exception {
                // 取出键
                String province = t._1 ;
                // 记录每个键对应的分组内容
                List<Tuple2<Integer , String>> saleInfoList = new ArrayList<>() ;
                Iterable<Tuple2<Integer , String>> saleInfo = t._2 ;
                for(Tuple2<Integer , String> temp : saleInfo) {
                    saleInfoList.add(new Tuple2<>(temp._1 , temp._2)) ;
                }

                // 对分组内容进行自定义排序,数据量大时 Collections.sort 会出现 OOM 内存溢出,建议用插入排序或者归并排序
                Collections.sort(saleInfoList, new Comparator<Tuple2<Integer, String>>() {
                    @Override
                    public int compare(Tuple2<Integer, String> t1, Tuple2<Integer, String> t2) {
                        if(t1 != null && t2 != null) {
                            return -(t1._1 - t2._1);
                        }
                        return 0 ;
                    }
                });
                saleInfoList = saleInfoList.subList(0, 3) ;

                /**
                 * 以下代码不可行,call 方法在 executor 中进行调用,executor中不能再创建RDD
                 */
//                JavaPairRDD<Integer , String> saleInfoRDD = sc.parallelizePairs(saleInfoList) ;
//                List<Tuple2<Integer , String>> res = saleInfoRDD.sortByKey(false).take(3) ;

                return new Tuple2<String , Iterable<Tuple2<Integer,String>>>(province , saleInfoList) ;
            }
        }) ;

        saleGroupByProvinceTopNRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<Integer, String>>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Tuple2<Integer, String>>> t1) throws Exception {
                if(t1 != null) {
                    for(Tuple2<Integer , String> s : t1._2) {
                        System.out.println("province : " + t1._1 + " saleCount : " + s._1 + " info : " + s._2);
                    }
                }
            }
        });

        sc.stop();
    }
}
           

继续阅读