天天看點

java8新特性---StreamApi

概述

我們都知道sql語句可以讓我們很友善實作一些需求例如,取TopN、排序、過濾等操作。學過scala的同學應該知道,scala中提供了很多的算子也可以很友善的進行一些資料的處理,java中可能就沒那麼多算子了,需要自定義去實作,但是現在java8中給我們提供了Stream Api彌補了這裡劣勢,提供了很多方法,不用sql也可以實作。

StreamApi

  1. Stream API 三部曲

    建立 Stream—> 中間操作(Transform) lazy—>終止操作(action)

建立Steeam的方式
import org.junit.Test;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/*
 * 一、Stream API 的操作步驟:
 *
 * 1. 建立 Stream
 *
 * 2. 中間操作(Transform) lazy
 *
 * 3. 終止操作(action)
 *
 * 其實這個Stream 類似于Spark中的RDD操作首先建立一個Stream(存放資料的容器),
 * 然後調用各種算子進行transform操作,但這是一個lazy的過程,也就是不執行最後的
 * action操作,是不會有結果的.下面通過例子進行學習.
 */
public class StreamApiTrain {

    // 1 建立Stream
    @Test
    public void test1(){
        /**
        * 方式1: 通過Collection 提供的Stream() ParallelStream()
        */
        List<String> list = Arrays.asList("java","spark","hadoop","scala");
        //擷取一個順序流
        Stream<String> stream = list.stream();
        stream.forEach(System.out::println);
        //擷取一個并行流
        Stream<String> stringStream = list.parallelStream();
        stringStream.forEach(System.out::println);

        /**
         * 方式2: 通過 Arrays 中的 stream() 擷取一個數組流
         */
        String [] strings = new String[5];
        Stream<String> stream1 = Arrays.stream(strings);

        /**
         * 方式3: 通過 Stream 類中靜态方法 of()
         */
        Stream<String> stream2 = Stream.of("java", "spark", "hadoop", "scala");

        /**
         * 方式4: 建立無限流
         */
        Stream<Integer> iterate = Stream.iterate(0, x -> x + 2)
                .limit(10);
        iterate.forEach(System.out::println);

        Stream<Double> generate = Stream.generate(()->Math.random())
                .limit(10);
        generate.forEach(System.out::println);
    }
           
Transform(中間操作)
/**
     * Transform:
     * 1.filter 過濾
     * 2.limit 取Top n
     * 3.skip(n) —— 跳過元素,傳回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則傳回一個空流。與 limit(n) 互補
     * 4.distinct 去重
     * 5.sorted  自然排序   sorted(Comparator com)——定制排序
     */

    @Test
    public void test2(){
        List<String> list = Arrays.asList("java","spark","hadoop","scala","hive","zookeeper","kafka","spark","java");
        list.stream()
                .filter(x->{
                    System.out.println("過濾操作");
                    return x.length() > 4;
                })
                .limit(3)
                .forEach(System.out::println);

        System.out.println("--------------------");
        list.stream().skip(2).forEach(System.out::println);

        System.out.println("--------------------");
        list.stream().distinct().forEach(System.out::println);

        System.out.println("--------------------");
        list.stream().sorted().forEach(System.out::println);

        System.out.println("--------------------");
        list.stream().sorted((x,y)->{
            if (x.length() == y.length()){
                return x.compareTo(y);
            }else {
               return Integer.compare(x.length(),y.length());
            }
        }).forEach(System.out::println);
    }
           
  • map vs flatMap
/**
     * Transform:
     * map  vs  flatMap
     *
     * map——接收 Lambda , 将元素轉換成其他形式或提取資訊。接收一個函數作為參數,該函數會被應用到每個元素上,并将其映射成一個新的元素。
     * flatMap——接收一個函數作為參數,将流中的每個值都換成另一個流,然後把所有流連接配接成一個流
     */
    @Test
    public void test3(){
        List<String> list = Arrays.asList("java spark hadoop scala hive zookeeper kafka spark java");
        list.stream()
            .map(String::toUpperCase)
            .forEach(System.out::println);

        System.out.println("--------------------");

        Stream<String> stringStream = list.stream().flatMap(x -> Stream.of(x.split(" ")));
        stringStream.forEach(System.out::println);


    }
           
  • reduce
/**
     * reduce: 該方法可以傳遞多個參數(源碼中給了很多例子)
     *
     * Optional<T> reduce(BinaryOperator<T> accumulator); 傳遞兩個參數:第一個參數是上次函數執行的傳回值(也稱為中間結果),第二個參數是stream中的元素,這個函數把這兩個值相加,得到的和會被指派給下次執行這個函數的第一個參數
     *
     * T reduce(T identity, BinaryOperator<T> accumulator); identity相當于給一個初始值,在初始值的基礎上進行操作
     *
     * <U> U reduce(U identity,
     *                  BiFunction<U, ? super T, U> accumulator,
     *                  BinaryOperator<U> combiner);
     * identity: 一個初始化的值;這個初始化的值其類型是泛型U,與Reduce方法傳回的類型一緻
     * accumulator: 其類型是BiFunction,輸入是U與T兩個類型的資料,而傳回的是U類型;
     *              也就是說傳回的類型與輸入的第一個參數類型是一樣的,而輸入的第二個參數類型與Stream中元素類型是一樣的。
     * combiner: 其類型是BinaryOperator,支援的是對U類型的對象進行操作;
     * 第三個參數combiner主要是使用在并行計算的場景下;如果Stream是非并行時,第三個參數實際上是不生效的。
     */
    List<User> emps = Arrays.asList(
            new User(2, "李四", 59, 6666.66),
            new User(1, "張三", 18, 9999.99),
            new User(3, "王五", 28, 3333.33),
            new User(4, "趙六", 8, 7777.77),
            new User(1, "張三", 18, 9999.99),
            new User(4, "趙六", 8, 7777.77),
            new User(5, "田七", 38, 5555.55)
    );

    @Test
    public void test4(){
        //計算user總人數
        Optional<Integer> count = emps.stream()
                                      .map((e) -> 1)
                                      .reduce((x,y)->x+y);
        //System.out.println(count.get());

        //計算薪水總和
        Double totalSalary = emps.stream()
                                 .map(x -> x.getSalary())
                                 .reduce(0.00, (x, y) -> x + y);
        //System.out.println(totalSalary);

        //測試非并行
        Integer reduce = emps.stream()
                            .map(x -> x.getId())
                            .reduce(2, ((x1, y1) -> {
                                System.out.println("x1,y1   "+x1+":"+y1);
                                System.out.println("x1+y1   "+x1+y1);
                                return x1+y1;
                            }), (x2, y2) -> {
                                System.out.println("x2,y2   "+x2+":"+y2);
                                System.out.println("x2+y2   "+x2+y2);
                                return x2+y2;
                            });
        System.out.println(reduce); //22(2+2+1+3+4+1+4+5)  非并行參數三不生效

        //測試并行
        Integer reduce1 = emps.parallelStream()
                .map(x -> x.getId())
                .reduce(2, ((x1, y1) -> {
                    System.out.println("x1,y1   "+x1+":"+y1);
                    System.out.println("x1+y1   "+x1+y1);
                    return x1+y1;
                }), (x2, y2) -> {
                    System.out.println("x2,y2   "+x2+":"+y2);
                    System.out.println("x2+y2   "+x2+y2);
                    return x2+y2;
                });
        System.out.println(reduce1); //34 = (2+2)+(2+1)+(2+3)+(2+4)+(2+1)+(2+4)+(2+5)  并行參數三生效

    }

    @Test
    public void test5() {
        List<String> list = Arrays.asList("hello","hadoop","hive","hadoop","hadoop","hello");
        Map<String, List<String>> collect = list.stream().collect(Collectors.groupingBy((x)->x));
        System.out.println(collect.values());
        Stream<Map<String,Integer>> stream = collect.values()
                .stream()
                .map((x)->{
                    Map<String, Integer> map=new HashMap<>();
                    map.put(x.get(0), x.size());return map;
                });
        stream.forEach(System.out::println);
    }
           
action操作
/**
     * action操作:
     * allMatch——檢查是否比對所有元素
     * anyMatch——檢查是否至少比對一個元素
     * noneMatch——檢查是否沒有比對的元素
     * findFirst——傳回第一個元素
     * findAny——傳回目前流中的任意元素
     * count——傳回流中元素的總個數
     * max——傳回流中最大值
     * min——傳回流中最小值
     * forEach-周遊元素
     */
    @Test
    public void test6(){
        List<String> list = Arrays.asList("java","spark","hadoop","scala","hive","zookeeper","kafka","spark","java");
        boolean b = list.stream()
                        .allMatch(x -> x.length() > 3); //判斷是否是以元素長度都大于3
        System.out.println(b);


        boolean b1 = list.stream()
                         .anyMatch(x -> x.length() > 4); //判斷至少有一個元素長度大于4
        System.out.println(b1);


        Optional<String> first = list.stream()
                                     .findFirst();
        System.out.println(first.get());


        Optional<String> any = list.stream()
                                   .findAny();
        System.out.println(any.get());


        long count = list.stream().count();
        System.out.println(count);


        //注意:流進行了終止操作後,不能再次使用
    }

    /**
     * action操作:
     * collect--将流轉換為其他形式。接收一個 Collector接口的實作,用于給Stream中元素做彙總的方法
     * collect方法中需要傳遞一個Collector,而Collectors類可以實作(源碼中提供了很多例子)
     * Collectors類中提供了很多方法進行操作.
     */
    @Test
    public void test7(){
        //取user中的名字放入list中
        List<String> collect = emps.stream()
                                   .map(User::getName)
                                   .collect(Collectors.toList());
        collect.forEach(System.out::println);

        //根據id進行分組
        Map<Integer, List<User>> collect1 = emps.stream()
                .collect(Collectors.groupingBy(User::getId));
        collect1.forEach((key,value)-> System.out.println(key+":"+value));

        //根據薪水和年齡進行分區
        Map<Boolean, List<User>> collect2 = emps.stream()
                .collect(Collectors.partitioningBy(x -> (x.getSalary() > 6000) && x.getAge()<30));
        collect2.forEach((key,value)-> System.out.println(key+":"+value));

        //取User中名字進行joining操作
        String collect3 = emps.stream().map(User::getName).collect(Collectors.joining(","));
        System.out.println(collect3);
    }

}