天天看點

轉換算子 java和scala示例代碼

Java代碼:

package com.netcloud.spark.sparkcore.transformation;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 轉換算子實戰
 * map;将集合中的每個元素都乘以2
 * filter:過濾出集合中的所有的偶數
 * flatMap:将文本行拆分多個單詞
 * groupByKey:将班級的成績分組
 * reduceByKey:統計每個班級的總分數
 * sortByKey:将學生分數進行排序 預設是true 升序排序
 * join:列印每個學生的成績
 * cogroup:列印每個學生的成績
 *
 * @author yangshaojun
 * #date  2019/3/9 17:10
 * @version 1.0
 */
public class TransFormationPractice {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //map(sc);
        //filter(sc);
        //groupByKey(sc);
        //reduceByKey(sc);
        //sortByKey(sc);
        //join(sc);
        cogroup(sc);
        sc.stop();

    }

    /**
     * Map算子的使用
     *
     * @param sc
     */
    private static void map(JavaSparkContext sc) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        /**
         * 使用map算子;将集合中的每個元素都乘以2
         * map算子,是對任何類型的RDD,都可以調用
         * 在java中map算子接收的參數是Function對象
         * 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回新元素的類型
         * 同時call()方法的傳回類型,也必須與第二個泛型類型同步。
         * 在call方法内部,就可以對原始RDD中的每個元素進行各種處理和計算,并傳回一個新的元素。
         * 所有新的元素就會組成一個新的RDD。
         */
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            /**
             * 傳入call方法的就是 1,2,3,4,5
             * 傳回的就是 2,4,6,8,10
             *
             * @param v1
             * @return
             * @throws Exception
             */
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 * 2;
            }
        });
        //列印新的RDD
        multipleNumberRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

    }

    /**
     * filter:過濾出集合中的所有的偶數
     */
    private static void filter(JavaSparkContext sc) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
        JavaRDD<Integer> evennumRDD = numbersRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer v1) throws Exception {
                return v1 % 2 == 0;
            }
        });
        evennumRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
    }

    /**
     * flatMap:将文本行拆分多個單詞
     */
    private static void flatMap(JavaSparkContext sc) {
        List<String> numbers = Arrays.asList("hello you", "hello me", "hello world");
        JavaRDD<String> numbersRDD = sc.parallelize(numbers);
        /**
         * flatMap算子 在java中接收的參數是 FlatMapFunction
         * 我們需要自己定義FlatMapFunction的第二個參數泛型,即:代表了傳回的新元素的類型;
         * call() 方法傳回的類型是 不是U 而是 Iterable<<U> 這裡的U也與第二個泛型類型相同。
         *
         */
//        JavaRDD<String> evennumRDD = numbersRDD.flatMap(new FlatMapFunction<String, String>() {
//            @Override
//            public Iterable<String> call(String s) throws Exception {
//                return Arrays.asList(s.split(" "));
//            }
//        });
//        evennumRDD.foreach(new VoidFunction<String>() {
//            @Override
//            public void call(String s) throws Exception {
//                System.out.println(s);
//            }
//        });
    }

    /**
     * groupByKey:按照班級将成績分組
     * 使用groupByKey後傳回的算子類型依然是JavaPairRDD 但是第二個參數泛型是Iterable<Integer>
     */
    private static void groupByKey(JavaSparkContext sc) {

        List<Tuple2<String, Integer>> scoresList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 87),
                new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class1", 65));
        //并行化集合,建立JavaPairRDD 每個元素是tuple2
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
        JavaPairRDD<String, Iterable<Integer>> groupedscore = scores.groupByKey();
        groupedscore.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
                                 @Override
                                 public void call(Tuple2<String, Iterable<Integer>> t1) throws Exception {
                                     Iterator<Integer> ite = t1._2.iterator();
                                     while (ite.hasNext()) {
                                         System.out.println(ite.next());
                                     }
                                     System.out.println("=============================");

                                 }
                             }
        );

    }

    /**
     * reduceByKey:統計每個班級的總分數
     *
     * @param sc
     */
    public static void reduceByKey(JavaSparkContext sc) {
        List<Tuple2<String, Integer>> scoresList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 87),
                new Tuple2<String, Integer>("class2", 90),
                new Tuple2<String, Integer>("class1", 65));
        //并行化集合,建立JavaPairRDD 每個元素是tuple2
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
        /**
         * reduceByKey接收的參數是Function2類型,它有3個泛型參數,前兩個泛型參數類型代表原始的RDD value的類型
         * 是以 ,對每個key 進行reduce 都會依次将第一個、第二個value傳入 然後将得到的值載與第三個value傳入
         * 是以此處, 會自動的定義兩個泛型類型 代表call()方法的兩個傳入的參數的類型
         * 第三個泛型類型,代表了每次reduce操作傳回的值類型,預設也是與原始的RDD的value類型相同的。
         * reduceByKey算的傳回的RDD 還是JavaPairRDD<Key,value>
         */

        JavaPairRDD<String, Integer> reduceRDD = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        reduceRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> t1) throws Exception {
                System.out.println("class:" + t1._1 + " " + "totalscore:" + t1._2);
            }
        });


    }

    /**
     * sortByKey:根據分數進行排序
     *
     * @param sc
     */
    public static void sortByKey(JavaSparkContext sc) {
        List<Tuple2<Integer, String>> scoresList =
                Arrays.asList(
                        new Tuple2<Integer, String>(65, "leo"),
                        new Tuple2<Integer, String>(50, "leo1"),
                        new Tuple2<Integer, String>(100, "leo2")

                );
        JavaPairRDD<Integer, String> scoreRDD = sc.parallelizePairs(scoresList, 1);
        /**
         * Java中的sortByKey方法中接收的參數是 boolean 預設是true 升序排序
         */
        JavaPairRDD<Integer, String> retRDD = scoreRDD.sortByKey();
        retRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> t1) throws Exception {
                System.out.println(t1._1 + ":" + t1._2);
            }
        });

    }

    /**
     * join:列印每個學生的成績
     *
     * @param sc
     */
    public static void join(JavaSparkContext sc) {

        List<Tuple2<Integer, String>> students = Arrays.asList(
                new Tuple2<>(1, "tom"),
                new Tuple2<>(2, "jack"),
                new Tuple2<>(3, "marry"),
                new Tuple2<>(4, "ff"));
        List<Tuple2<Integer, Integer>> scores = Arrays.asList(
                new Tuple2<>(1, 80),
                new Tuple2<>(2, 90),
                new Tuple2<>(3, 100),
                new Tuple2<>(4, 60));
        JavaPairRDD<Integer, String> studentRDD = sc.parallelizePairs(students);
        JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(scores);
        JavaPairRDD<Integer, Tuple2<String, Integer>> retRDD = studentRDD.join(scoreRDD);
        /**
         *使用 join算子 關聯兩個RDD 是等值連接配接
         * join以後 是根據key進行join的,并且傳回JavapairRDD
         * JavaPairRDD的第一個泛型是之前兩個RDD的key類型 第二個泛型類型是Tuple2<v1,v2>類型
         * Tuple2的兩個泛型分别是原始RDD的value類型
         *  比如 (1,1) (1,2) (1,3)的RDD
         * 還有一個 (1,4) (2,1) (2,2)
         *  join以後 實際上得到 (1,(1,4)) (1(2,4)) (1,,3,4))
         */
        retRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> t1) throws Exception {
                System.out.println(t1);
            }
        });
    }

    /**
     * cogroup:列印每個學生的成績
     *
     * @param sc
     */
    public static void cogroup(JavaSparkContext sc) {

        List<Tuple2<Integer, String>> students = Arrays.asList(
                new Tuple2<>(1, "tom"),
                new Tuple2<>(2, "jack"),
                new Tuple2<>(3, "marry"),
                new Tuple2<>(4, "ff"));
        List<Tuple2<Integer, Integer>> scores = Arrays.asList(
                new Tuple2<>(1, 80),
                new Tuple2<>(2, 90),
                new Tuple2<>(3, 100),
                new Tuple2<>(1, 60),
                new Tuple2<>(2, 80));
        JavaPairRDD<Integer, String> studentRDD = sc.parallelizePairs(students);
        JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(scores);
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> retRDD = studentRDD.cogroup(scoreRDD);
        /**
         *cogroup和join不同
         * 相當于 一個 key join上所有的value 都放到一個Iterable裡面去了
         * cogroup不太好講解 須有動手編寫案例仔細體會。
         */
        retRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t1) throws Exception {
                System.out.println(t1);

            }
        });
        /**
         * (1,([tom],[80, 60]))
         */
    }

}
           

Scala代碼

package com.netcloud.bigdata.spark_core.basiclearning.transform


import org.apache.spark.{SparkConf, SparkContext}

/**
  * 轉換算子實戰
  * map:将集合中的所有元素乘以2
  * filter:過濾出集合中的所有的偶數
  * flatMap:将文本行拆分多個單詞
  * groupByKey:将班級的成績分組
  * reduceByKey:統計每個班級的總分數
  * sortByKey:對班級的分數進行排序 預設是升序排序
  * join:列印每個學生的成績
  * cogroup:列印每個學生的成績
  *
  * @author yangshaojun
  * #date  2019/3/9 17:08
  * @version 1.0
  */
object Transform_0020_TransformPractice {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("TransFormationPractice").setMaster("local[2]")
    val sc = new SparkContext(conf)
    //    map(sc)
    //    filter(sc)
    //    flatMap(sc)
    //    groupByKey(sc)
    //    reduceByKey(sc)
    //    sortByKey(sc)
    //    join(sc)
    corgroup(sc)
    sc.stop()

  }

  /**
    * map:将集合中的所有元素乘以2
    *
    * @param sc
    */
  def map(sc: SparkContext): Unit = {
    val numlist = List(1, 2, 3, 4, 5);
    val numberRDD = sc.parallelize(numlist)
    /**
      * 使用map算子;将集合中的每個元素都乘以2
      */
    val multipleNumberRDD = numberRDD.map(_ * 2)
    multipleNumberRDD.foreach(println)
  }

  /**
    * filter:過濾出集合中的所有的偶數
    *
    * @param sc
    */
  def filter(sc: SparkContext): Unit = {
    val numlist = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    val numberRDD = sc.parallelize(numlist)
    /**
      * 使用map算子;将集合中的每個元素都乘以2
      */
    val multipleNumberRDD = numberRDD.filter(_ % 2 == 0)
    multipleNumberRDD.foreach(println)
  }

  /**
    * flatMap:将文本行拆分多個單詞
    */
  def flatMap(sc: SparkContext): Unit = {
    val numlist = List("hello you", "hello me", "hello world");
    val numberRDD = sc.parallelize(numlist)
    /**
      * 使用map算子;将集合中的每個元素都乘以2
      */
    val multipleNumberRDD = numberRDD.flatMap(_.split(" "))
    multipleNumberRDD.foreach(println)
  }

  /**
    * groupByKey:按照班級對成績分組
    */
  def groupByKey(sc: SparkContext): Unit = {
    val numlist = List(Tuple2("class1", 90), Tuple2("class1", 92), Tuple2("class2", 50), Tuple2("class2", 60));
    val numberRDD = sc.parallelize(numlist)
    /**
      * 使用map算子;将集合中的每個元素都乘以2
      */
    val multipleNumberRDD = numberRDD.groupByKey()
    multipleNumberRDD.foreach(println)
  }

  /**
    * reduceByKey:統計每個班級的總分數
    *
    * @param sc
    */
  def reduceByKey(sc: SparkContext): Unit = {
    val numlist = List(Tuple2("class1", 90), Tuple2("class1", 92), Tuple2("class2", 50), Tuple2("class2", 60));
    val numberRDD = sc.parallelize(numlist)
    val retRDD = numberRDD.reduceByKey(_ + _)
    retRDD.foreach(println)

  }

  /**
    * sortByKey:對班級的分數進行排序
    */
  def sortByKey(sc: SparkContext): Unit = {
    val numlist = Array(Tuple2(65, "Tom"), Tuple2(50, "jack"), Tuple2(100, "luce"), Tuple2(85, "ff"));
    val scoreRDD = sc.parallelize(numlist)
    val retValuRDD = scoreRDD.sortByKey(false)
    retValuRDD.collect.foreach(println)
  }

  /**
    * join:列印每個學生的成績
    */
  def join(sc: SparkContext): Unit = {
    val students = Array(
      Tuple2(1, "tom"),
      Tuple2(2, "jack"),
      Tuple2(3, "marry"),
      Tuple2(4, "ff"))

    val scores = Array(
      Tuple2(1, 80),
      Tuple2(2, 90),
      Tuple2(3, 100),
      Tuple2(1, 60),
      Tuple2(2, 80))

    val studentRDD = sc.parallelize(students)
    val scoreRDD = sc.parallelize(scores)
    val retRDD = studentRDD.join(scoreRDD)
    retRDD.foreach(println)

    /**
      * (1,(tom,80))
      * (2,(jack,90))
      * (1,(tom,60))
      * (2,(jack,80))
      * (3,(marry,100))
      */

  }

  /**
    * corgroup:列印每個學生的成績
    */
  def corgroup(sc: SparkContext): Unit = {
    val students = Array(
      Tuple2(1, "tom"),
      Tuple2(2, "jack"),
      Tuple2(3, "marry"),
      Tuple2(4, "ff"))

    val scores = Array(
      Tuple2(1, 80),
      Tuple2(2, 90),
      Tuple2(3, 100),
      Tuple2(1, 60),
      Tuple2(2, 80))

    val studentRDD = sc.parallelize(students)
    val scoreRDD = sc.parallelize(scores)
    val retRDD = studentRDD.cogroup(scoreRDD)
    retRDD.foreach(println)

    /**
      * (4,(CompactBuffer(ff),CompactBuffer()))
      * (1,(CompactBuffer(tom),CompactBuffer(80, 60)))
      * (2,(CompactBuffer(jack),CompactBuffer(90, 80)))
      * (3,(CompactBuffer(marry),CompactBuffer(100)))
      */
  }


}
           

繼續閱讀