天天看點

Java 8 的新特性Stream,以及Java7的Fork/Join架構Java 8 的新特性Stream,以及Java7的Fork/Join架構

Java 8 的新特性Stream,以及Java7的Fork/Join架構

文章目錄

  • Java 8 的新特性Stream,以及Java7的Fork/Join架構
    • 一、流
    • 二、流的操作
      • 2.1 篩選、去重、截斷、跳過
      • 2.2 映射:map、flatMap
      • 2.3 查找和比對
      • 2.4 歸約 reduce
      • 2.5 數值流
      • 2.6 建立流
    • 三、流的收集器
      • 3.1 `collect`、`Collector`、`Collectors` 的差別
      • 3.2 歸約和彙總
      • 3.3 分組
      • 3.4 分區
      • 3.5 收集器接口
        • (1) `Collector` 接口
        • (2) 使用stream.collect(supplier, accumulator, combiner) 的重載方法
      • 3.6 并行資料處理與性能高
        • (1)并行流與串行流的轉化
        • (2)預設并行流線程池:`ForkJoinPool`
        • (3) 并行流的反例
          • 反例一:使用不易并行化的操作導緻,性能變差
          • 反例二:使用共享變量導緻結果錯誤
        • (4)何時使用并行流
      • 3.7 分支/合并架構
        • (1)建立`RecursiveTask ` 或`RecursiveAction`的子類
        • (2)調用`new ForkJoinPool().invoke()`
        • (3)使用分支合并架構的注意事項
      • 3.8 Spliterator 可分疊代器
    • 其他、Optional和OptionalInt

一、流

  • 流像一個延遲建立的集合
  • 流隻能消費一次
  • 流利用了内部疊代

二、流的操作

流的操作分為:中間操作、終端操作。

流的延遲特性:中間操作可以連成一個操作流水線,除非發出一個終端操作,中間操作不會執行任何處理。

利用延遲特性:短路、循環合并。

2.1 篩選、去重、截斷、跳過

  • filter()

  • distinct()

    根據 hashCode() 和 equals() 去除
  • limit()

    截斷
  • skip()

    ,跳過前面n個元素,和limit() 截然相反

2.2 映射:map、flatMap

  • map 映射: 根據一個元素,建立出一個新的元素
  • flatMap 扁平化:把一個流中的每一個值都換成另一個流,然後把所有流連接配接起來成為一個流。
String[] words = {"hello", "world"};
// [h, e, l, l, o, w, o, r, l, d]
List<String> charList = Arrays.stream(words).map(item -> item.split("")).flatMap(Arrays::stream).collect(Collectors.toList());
           

題目:

給定兩個數字清單,如何傳回所有的數對呢?例如,給定清單[1, 2, 3]和清單[3, 4],應

該傳回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]。為簡單起見,你可以用有兩個元素的數組來代

表數對。
           
Integer[][] aa = {{1, 2, 3},{3, 4}};
        List<Integer[]> pairs = Arrays.stream(aa[0])
                .flatMap(i -> Arrays.stream(aa[1])
                        .map(j -> new Integer[]{i, j})
                ).collect(Collectors.toList());
           

2.3 查找和比對

比對

  • anyMatch()

    檢查是否至少比對一個元素,傳回boolean
  • allMatch()

    檢查是否比對所有元素,傳回boolean
  • noneMath()

    流中沒有任何元素與給定的判斷比對

查找

  • findAny()

    傳回任意一個
  • findFirst()

    查找第一個元素

2.4 歸約 reduce

求和:

List<Integer> a = new ArrayList<>(Arrays.asList(3, 4, 5, 6, 8));
Integer res = a.stream().reduce(0, (i, j) -> i + j);
Integer res1 = a.stream().reduce(0, Integer::sum);
Optional<Integer> res2 = a.stream().reduce(Integer::sum);
System.out.println(res2.orElse(0));
           

最大值和最小值:

Optional<Integer> maxOptional = a.stream().reduce(Integer::max);
System.out.println(maxOptional.orElse(0));
Optional<Integer> minOptional = a.stream().reduce(Integer::min);
System.out.println(minOptional.orElse(0));
           

使用流中内置的方法統計個數:

long count = a.stream().count();
           

2.5 數值流

  • 原始類型流特化:避免裝箱成本。IntStream、DoubleStream、LongStream。
  • 映射為數值流: mapToInt() ,傳回一個IntStream
  • 轉回對象流:intStream.boxed()
  • 數值範圍:通過數值流, intStream.range() 和 intStream.rangeClosed()

2.6 建立流

  • 由值建立流
    Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);
               
  • 由數組建立流
    Stream<Integer> stream1 = Arrays.stream(new Integer[]{3, 5, 6, 7});
               
  • 由檔案生成流
    // 由檔案生成流
            String filePath ="pom.xml";
            try (Stream<String> fileStream = Files.lines(Paths.get(filePath), Charset.defaultCharset())){
                long count = fileStream.flatMap(line -> Arrays.stream(line.split(" "))).count();
                System.out.println(count);
            } catch (IOException e) {
                e.printStackTrace();
            }
               
  • 由函數生成流:建立無限流
    • Stream.iterate()

    • Stream.generate()

    // 列印前十個偶數
    Stream.iterate(0, n->n+2).limit(10).forEach(System.out::println);
    // 斐波那契數列
    Stream.iterate(new Integer[]{0, 1}, a->new Integer[]{a[1], a[0]+a[1]})
                    .limit(10).map(a->a[0]).forEach(k-> System.out.print(k+ " "));
               
    generate 生成
    Stream.generate(Math::random).limit(5).forEach(System.out::println);
    // generate 産生斐波那契序列
    IntStream.generate(new IntSupplier() {
                private int pre = 0;
                private int cur = 1;
                @Override
                public int getAsInt() {
                    int oldPre = this.pre;
                    int nexVal = this.pre + this.cur;
                    this.pre = this.cur;
                    this.cur = nexVal;
                    return oldPre;
                }
            }).limit(10).forEach(k-> System.out.print(k+ " "));
               

三、流的收集器

3.1

collect

Collector

Collectors

的差別

  • collect()

    觸發歸約操作
  • Collector

    是接口。其方法的實作決定了如何對流進行歸約
  • Collectors

    提供了很多靜态方法,可以很友善的建立常見的收集器;

3.2 歸約和彙總

  • Collectors.counting()

    統計數量
    Long count = list.stream().collect(Collectors.counting());
    long count1 = list.stream().count();
               
  • Collectors.maxBy()

    Collectors.minBy()

    最大值和最小值
    Optional<String> maxOpt = list.stream().collect(Collectors.maxBy(String::compareTo));
    Optional<String> minOpt = list.stream().collect(Collectors.minBy(String::compareTo));
               
  • 彙總,求和、求平均
    • Collectors.summingInt()

    • Collectors.summingDouble()

    • Collectors.summingLong()

    求平均
    • Collectors.averagingInt()

    • Collectors.averagingDouble()

    • Collectors.averagingLong()

    Integer sum = Arrays.stream(a).collect(Collectors.summingInt(Integer::intValue));
    Double avg = Arrays.stream(a).collect(Collectors.averagingInt(Integer::intValue));
               
  • 同時獲得:最大值、最小值、平均值、和
    • Collectors.summarizingInt()

    • Collectors.summarizingDouble()

    • Collectors.summarizingLong()

    IntSummaryStatistics summaryStatistics = Arrays.stream(a).collect(Collectors.summarizingInt(Integer::intValue));
               
  • 連接配接字元:

    Collectors.joining()

    String res = list.stream().collect(Collectors.joining());
    String res2 = list.stream().collect(Collectors.joining(","));
               
  • 廣義的歸約彙總:

    Collectors.reducing()

3.3 分組

  • Collectors.groupingBy()

    Map<Integer, List<Integer>> collect = Arrays.asList(a).stream().collect(Collectors.groupingBy(Integer::intValue));
    Map<String, List<Integer>> collect2 = Arrays.asList(a).stream().collect(Collectors.groupingBy(item -> {
                if (item > 3) {
                    return "aa";
                } else {
                    return "bb";
                }
            }));
               
  • Collectors.groupingBy(Dish::getName(), Collectors.groupingBy(Dish::getType()))

  • 傳遞給第一個groupingBy() 的第二個收集器可以是任何類型,而不一定是groupingBy()
    • Collectors.groupingBy(Dish::getName(), Collectors.counting())

    Map<String, Long> collect1 = Arrays.asList(a).stream().collect(Collectors.groupingBy(item -> {
                if (item > 3) {
                    return "aa";
                } else {
                    return "bb";
                }
            }, Collectors.counting()));
               
  • collectingAndThen

    将收集器的結果轉換成另外一種結果
    Map<String, Dish> map1 = dishList.stream().collect(Collectors.groupingBy(Dish::getType,
                    Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(Dish::getName)),
                            Optional::get)));
               

3.4 分區

  • Collectors.partitioningBy()

    分組的特殊情況,将資料分為 true和false兩組。
// 判斷一個數值是否為質數
    private static boolean isPrime(int n) {
        return IntStream.range(2, n).noneMatch(v-> n % v==0);
    }

    // 判斷一個數值是否為質數   優化
    private static boolean isPrime2(int n) {
        int sqrt = (int)Math.sqrt(n);
        return IntStream.rangeClosed(2, sqrt).noneMatch(v-> n % v==0);
    }
           

3.5 收集器接口

(1)

Collector

接口

Collector<T,A,R>

: T 流的類型,A 為累加器類型,R為收集操作得到的類型;

  • supplier()

    : 建立新的結果容器; 建立空的累加器執行個體,供資料收集過程中使用;
  • accumulator()

    : 累加器
  • finisher()

    : 将累加器對象轉換成為整個集合操作的最終結果;
  • combiner

    : 合并兩個結果容器
  • characterististics

    : 傳回一個不可變得Characterististics集合,定義了收集器的行為
    • UNORDERED 歸約結果不受流中項目的周遊和累積順序的影響;
    • CONCURRENT 并行歸約流,它僅在用于無序資料源時才可以并行歸約;
    • IDENTIFY_FINISH 表明直接将累加器的結果作為最終結果

第一個示例:ToListCollector

第二個案例:擷取質數 , 及對比 對比兩種擷取質數的方法

(2) 使用stream.collect(supplier, accumulator, combiner) 的重載方法

ArrayList<String> res = list.stream().collect(ArrayList::new, List::add, List::addAll);
           

3.6 并行資料處理與性能高

(1)并行流與串行流的轉化

parallel

, 将串行流轉化成并行流;

sequential

,将并行流轉化成并行流;

parallel和sequential可以結合使用,在内部實際上是一個boolean辨別,最後一次的parallel或sequential調用會影響整個流水線。

(2)預設并行流線程池:

ForkJoinPool

  • 預設線程數的數量就是處理器數量:

    Runtime.getRuntime().availableProcessors()

    availableProcessors 看起來是處理器,實際上傳回的是可用核心的數量,包括超線程生成的虛拟核心。
  • 可以通過系統屬性,進行全局設定線程數:(沒有好的理由,強烈不建議修改)
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");
               

(3) 并行流的反例

反例一:使用不易并行化的操作導緻,性能變差

在下面的案例中,在實測中發現,此處的并行流比串行流慢很多。如果采用不易并行化的操作,可能會讓程式的整體性能變得更差。

// 串行流
    public long sequentialSum2(long n) {
        return LongStream.iterate(1l, i->i+1l).limit(n).reduce(0l, Long::sum);
    }
    // 并行流
    public long parallelSum(long n) {
        return LongStream.iterate(1l, i->i+1l).limit(n)
                .parallel()
                .reduce(0l, Long::sum);
    }
           

将上面案例改為使用 range , 将得到并行的效率高于串行:

// 串行
    public long sequentialSum3(long n) {
        return LongStream.rangeClosed(1l, n).limit(n).reduce(0l, Long::sum);
    }
    
    //  并行
    public long parallelSum3(long n) {
        return LongStream.rangeClosed(1l, n).limit(n)
                .parallel()
                .reduce(0l, Long::sum);
    }
           
反例二:使用共享變量導緻結果錯誤
class Accumulator {
        public long total = 0;
        public void add(long value) {
            total += value;
        }
    }
    
    public long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1l, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }
           

(4)何時使用并行流

  • 留意裝箱;
  • 需要流中的n個元素,而不是前n個;
  • 估算:N個元素,處理每個元素的成本為Q,Q越大,并行處理的性能好的可能行更大;
  • 對于較小的資料量,不建議使用并行流;
  • 考慮資料結構是否更容易分解;
  • 考慮并行流處理的合并時的成本;

可分解性:

資料結構 可分解性
ArrayList 極好
LinkedList 極差
IntStream.range 極好
Stream.iterator 極好
HashSet
TreeSet

3.7 分支/合并架構

它是ExecutorService 接口的實作。把子任務配置設定給線程池(ForkJoinPool)。

分治算法的并行版本。

(1)建立

RecursiveTask<R>

RecursiveAction

的子類

隻需要實作其唯一的抽象方法

compute

  • 定義将任務拆分為子任務的邏輯;
  • 以及無法再拆分時,生成單個子任務結果的邏輯;
if (任務足夠小或不可分){
   順序計算該任務
}else {
   将任務分成兩個子任務
   遞歸調用本方法,拆分每個子任務,等待所有子任務完成
   合并每個子任務的結果
}
           

(2)調用

new ForkJoinPool().invoke()

在實際應用中,使用多個ForkJoinPool是沒有意義的。一般來說,将其執行個體化一次,然後把執行個體儲存在靜态字段中,使之成為單例。

一個ForkJoinPool的案例:MyRecursiveSumTask

(3)使用分支合并架構的注意事項

  • join()

    方法會阻塞調用方,是以有必要在兩個子任務的就算都開始之後再調用它;
  • 不應該在

    RecursiveTask

    内部調用

    ForkJoinPool()

    invoke

    方法,應該始終調用

    fork()

    compute()

    方法
  • 任務調用

    fork()

    方法可以把它排進

    ForkJoinPool()

    , 左右兩個任務同時調用

    fork()

    的效率 比 讓其中一個任務調用

    compute()

    的效率低,這樣做可以為其中一個子任務重用同一線程,避免線上程池多配置設定一個任務造成開銷;
  • 分支/合并架構需要“預熱”或者說要執行幾遍才會被JIT編譯器優化;
  • 劃分成許多小任務而不是大任務,更有助于工作線程的負載平衡;

    這其中使用了“工作竊取算法”:每個線程都有雙端隊列(儲存配置設定給該線程的任務),某一個線程任務隊列空了,會去其他任意一個線程任務隊列中的末尾取一個任務。

3.8 Spliterator 可分疊代器

案例,建立一個切分字元串的Spliterator:MyWordCountSpliterator

其他、Optional和OptionalInt

  • OPtional

    對象流傳回值
  • OptionalInt

    數值流傳回值