天天看點

擁抱 Java 8 并行流吧,速度飛起!

前言

在 Java7 之前,如果想要并行處理一個集合,我們需要以下幾步:

手動分成幾部分

為每部分建立線程

在适當的時候合并。并且還需要關注多個線程之間共享變量的修改問題。

而 Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看吧

并行流

認識和開啟并行流

什麼是并行流: 并行流就是将一個流的内容分成多個資料塊,并用不同的線程分别處理每個不同資料塊的流。例如有這麼一個需求:

有一個 List 集合,而 list 中每個 apple 對象隻有重量,我們也知道 apple 的單價是 5元/kg,現在需要計算出每個 apple 的單價,傳統的方式是這樣:

List<Apple> appleList = new ArrayList<>(); // 假裝資料是從庫裡查出來的

for (Apple apple : appleList) {
    apple.setPrice(5.0 * apple.getWeight() / 1000);
}      

我們通過疊代器周遊 list 中的 apple 對象,完成了每個 apple 價格的計算。而這個算法的時間複雜度是 O(list.size()) 随着 list 大小的增加,耗時也會跟着線性增加。并行流可以大大縮短這個時間。并行流處理該集合的方法如下:

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));      

和普通流的差別是這裡調用的 parallelStream() 方法。當然也可以通過 stream.parallel() 将普通流轉換成并行流。推薦看下:Java 8 建立 Stream 的 10 種方式,更多可以關注Java技術棧公衆号回複java擷取系列教程。

并行流也能通過 sequential() 方法轉換為順序流,但要注意:流的并行和順序轉換不會對流本身做任何實際的變化,僅僅是打了個标記而已。并且在一條流水線上對流進行多次并行 / 順序的轉換,生效的是最後一次的方法調用

并行流如此友善,它的線程從那裡來呢?有多少個?怎麼配置呢?

并行流内部使用了預設的 ForkJoinPool 線程池。預設的線程數量就是處理器的核心數,而配置系統核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。不過該值是全局變量。

改變他會影響所有并行流。目前還無法為每個流配置專屬的線程數。一般來說采用處理器核心數是不錯的選擇

測試并行流的性能

為了更容易的測試性能,我們在每次計算完蘋果價格後,讓線程睡 1s,表示在這期間執行了其他 IO 相關的操作,并輸出程式執行耗時,順序執行的耗時:

public static void main(String[] args) throws InterruptedException {
    List<Apple> appleList = initAppleList();

    Date begin = new Date();
    for (Apple apple : appleList) {
        apple.setPrice(5.0 * apple.getWeight() / 1000);
        Thread.sleep(1000);
    }
    Date end = new Date();
    log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
}      

并行版本

List<Apple> appleList = initAppleList();

Date begin = new Date();
appleList.parallelStream()
.forEach(apple ->
         {
             apple.setPrice(5.0 * apple.getWeight() / 1000);
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
        );
Date end = new Date();
log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);      

耗時情況

擁抱 Java 8 并行流吧,速度飛起!

跟我們的預測一緻,我的電腦是 四核I5 處理器,開啟并行後四個處理器每人執行一個線程,最後 1s 完成了任務!

并行流可以随便用嗎?

可拆分性影響流的速度

通過上面的測試,有的人會輕易得到一個結論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部疊代,使用 Stream 提供的内部疊代來實作了。

事實真的是這樣嗎?并行流真的如此完美嗎?答案當然是否定的。大家可以複制下面的代碼,在自己的電腦上測試。測試完後可以發現,并行流并不總是最快的處理方式。

對于 iterate 方法來處理的前 n 個數字來說,不管并行與否,它總是慢于循環的,非并行版本可以了解為流化操作沒有循環更偏向底層導緻的慢。可并行版本是為什麼慢呢?這裡有兩個需要注意的點:

iterate 生成的是裝箱的對象,必須拆箱成數字才能求和

我們很難把 iterate 分成多個獨立的塊來并行執行

這個問題很有意思,我們必須意識到某些流操作比其他操作更容易并行化。對于 iterate 來說,每次應用這個函數都要依賴于前一次應用的結果。是以在這種情況下,我們不僅不能有效的将流劃分成小塊處理。反而還因為并行化再次增加了開支。

而對于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接将要生成的數字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。是以并行狀态下的 rangeClosed() 是快于 for 循環外部疊代的

package lambdasinaction.chap7;

import java.util.stream.*;

public class ParallelStreams {

    public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 0; i <= n; i++) {
            result += i;
        }
        return result;
    }

    public static long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
    }

    public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
    }

    public static long rangedSum(long n) {
        return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
    }

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
    }

}

package lambdasinaction.chap7;

import java.util.concurrent.*;
import java.util.function.*;

public class ParallelStreamsHarness {

    public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();

    public static void main(String[] args) {
        System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
        System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
        System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
        System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
        System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
    }

    public static <T, R> long measurePerf(Function<T, R> f, T input) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            R result = f.apply(input);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + result);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }
}      

共享變量修改的問題

并行流雖然輕易的實作了多線程,但是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分别使用順序流和并行流計算前n個自然數的和

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;
}

public static class Accumulator {
    private long total = 0;

    public void add(long value) {
        total += value;
    }
}      

順序執行每次輸出的結果都是:50000005000000,而并行執行的結果卻五花八門了。這是因為每次通路 totle 都會存在資料競争,關于資料競争的原因,大家可以看看關于 volatile 的部落格。是以當代碼中存在修改共享變量的操作時,是不建議使用并行流的。

并行流的使用注意

在并行流的使用上有下面幾點需要注意:

盡量使用 LongStream / IntStream / DoubleStream 等原始資料流代替 Stream 來處理數字,以避免頻繁拆裝箱帶來的額外開銷

要考慮流的操作流水線的總計算成本,假設 N 是要操作的任務總數,Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味着使用并行流帶來收益的可能性越大

例如:前端傳來幾種類型的資源,需要存儲到資料庫。每種資源對應不同的表。我們可以視作類型數為 N,存儲資料庫的網絡耗時 + 插入操作耗時為 Q。一般情況下網絡耗時都是比較大的。是以該操作就比較适合并行處理。當然當類型數目大于核心數時,該操作的性能提升就會打一定的折扣了。更好的優化方法在日後的部落格會為大家奉上

對于較少的資料量,不建議使用并行流

容易拆分成塊的流資料,建議使用并行流

以下是一些常見的集合架構對應流的可拆分性能表:

擁抱 Java 8 并行流吧,速度飛起!

碼字不易,如果你覺得讀完以後有收獲,不妨點個推薦讓更多的人看到吧!