天天看點

spark wordcount及常用算子

1 Javawordcount 直接上代碼

public class Testwordcount {
    private static final Pattern SPACE = Pattern.compile(" ");//定義分割的類型

    public static void main(String[] args) {
        String logFile = "D:\\ab.txt";
        SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark");
        JavaSparkContext sc = new JavaSparkContext(conf); 

        JavaRDD<String> logData = sc.textFile(logFile).cache(); //讀取檔案中的資料并緩存到記憶體

        JavaRDD<String> words = logData.flatMap(new FlatMapFunction<String, String>() { //将資料進行切割 執行個體中是以空格切割

            public Iterator<String> call(String s) {
                List<String> list = Arrays.asList(SPACE.split(s));
                Iterator<String> ss = list.iterator();
                return ss;
            }
        });

        JavaPairRDD<String, Integer> countss = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });


       // countss.saveAsTextFile(outfile);

             List<Tuple2<String, Integer>> output = countss.collect();
             for (Tuple2<?, ?> tuple : output) {
                  System.out.println(tuple._1() + ": " + tuple._2());
                 }


        sc.close();

    }
}

檔案中的資料
flume sd hadoop hbase kylin hdfs hadoop sd sd flume hdfs

執行的結果
20/05/12 10:10:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 23 ms on localhost (executor driver) (1/1)
20/05/12 10:10:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
20/05/12 10:10:47 INFO DAGScheduler: ResultStage 1 (collect at Testwordcount.java:58) finished in 0.024 s
20/05/12 10:10:47 INFO DAGScheduler: Job 0 finished: collect at Testwordcount.java:58, took 0.255136 s
hadoop: 2
flume: 2
kylin: 1
hdfs: 2
sd: 3
hbase: 1
           

2常用算子的使用

轉化類算子

flatMap 将資料進行切割 有一變多的作用

logData.flatMap(new FlatMapFunction<String, String>() {

            public Iterator<String> call(String s) {
                List<String> list = Arrays.asList(SPACE.split(s));
                Iterator<String> ss = list.iterator();
                return ss;
            }
           

parallelize和makeRDD 都是将資料轉化成RDD makeRDD的底層是parallelize

舉例:将 list變成RDD

ArrayList<String> list = new ArrayList<String>();list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");

        JavaRDD<String> parallelize = sc.parallelize(list);

        JavaRDD<String> filter = parallelize.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                if (s.equals("2")) {
                    return false;
                }
                return true;
            }
        });

        List<String> collect = filter.collect();
        System.out.println(collect);
           

filter 算子 過濾作用 傳回類型是Boolen false是過濾 true是資料通過

JavaRDD<String> filter = parallelize.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                if (s.equals("2")) {
                    return false;
                }
                return true;
            }
        });
           

acction 算子

first 擷取所有資料的第一個元素

count 計算處理之後行數

saveAsTextFile 輸出到某個檔案中

繼續閱讀