天天看點

effective java 3-第7章 lambda和stream[48]謹慎使用Stream并行

作者:CC挑燈夜讀1谷

在主流程式設計語言中,Java一直走在簡化并發程式設計任務的最前沿。1996年Java釋出時,就通過同步和wait/notify内置了對線程的支援。Java5 引入了java.util.concurrent類庫,提供了并行集包(concurrent collection)和執行者架構(executor framework) 。Java7引入了 fork-join包,這是一個處理并行分解的高性能架構。Java8引入了Stream,隻需要調用一次parallel方法就可以實作并行處理。在Java中編寫并發程式變得越來越容易,但是要編寫出正确又快速的并發程式,則一向沒那麼簡單。安全性和活性失敗是并發程式設計中需要面對的問題,Stream pipeline并行也不例外。

請看摘自45條的這段程式:

public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE) )
                .filter(mersenne -> mersenne.isProbablePrime(50))
                .limit(20)
                .forEach(System.out::println);

    }

    static Stream<BigInteger> primes(){
        return Stream.iterate( TWO,BigInteger::nextProbablePrime);
    }           

在我的機器上,這段程式會立即開始列印素數,玩完成運作花了12.5秒。假設我天真地想通過在Stream pipeline 上添加一個parallel() 調用來提速。你認為這樣會對其性能産生什麼樣的影響呢?運作速度會稍微快一點點嗎?還是會慢一點點?遺憾的是,其結果是根本不列印任何内容了,CPU的使用率卻定在90%一動不動了(活性失敗)。程式最後可能會終止,但是我不想一探究竟,半個小時後就強行把它終止了。

這是怎麼回事呢?簡單地說,Stream類庫不知道如何并行這個pipeline,以及如何探索失敗。即便在最佳環境下,如果源頭是來自于Stream.iterator,或者使用了中間操作的limit,那麼并行pipeline 也不可能提升性能。這個pipeline 必須同時滿足這兩個條件。更糟糕的是,預設的并行政策在處理limit 的不可預知性時,是假設額外多處理幾個元素,并放棄任何不需要的結果,這些都不會影響性能。在這種情況下,它查找梅森素數時,所花費的時間大概是查找之前元素的兩倍。因而,額外多計算一個元素的成本,大概相當于計算所有之前元素總和的時間,這個貌似無傷大雅的pipeline,卻使得自動并行算法瀕臨崩潰。這個故事的寓意很簡單:千萬不要任意地并行Stream pipeline。它造成的性能後果有可能是災難性的。

總之,在Stream 上通過并行獲得的性能,最好是通過ArrayList、HashMap、HashSet 和ConcurrentHashMap 執行個體,數組,int 範圍和long 範圍等。這些資料結果的共性是,都可以被精确、輕松地分成任意大小的子範圍,使并行線程中的分工變得更加輕松。Stream 類庫用來執行這個任務的抽象是分割疊代器(spliterator),它是由Stream 和 Iterable 中的 spliterator方法傳回。

這些資料結構共有e另一項重要特性是,在進行順序處理時,它們提供了優異的引用局部性(locality of reference):序列化的元素引用一起儲存在記憶體中。被那些引用通路到的對象在記憶體中可能不是一個緊挨着一個,這降低了引用局部性。事實證明,引用局部性對于并行批處理來說至關重要:沒有它,線程就會出現閑置,需要等待資料從記憶體轉移到處理器的緩存。具有最佳引用局部性的資料結構是基本類型資料數組,因為資料本身是相鄰地儲存在記憶體中的。

Stream pipeline 的終止操作本質上也影響了并發執行的效率。如果大量的工作在終止操作中完成,而不是全部工作在pipeline中完成,并且這個操作是固有的順序,那麼并行pipeline的效率就會受到限制。并行的最佳終止操作是做減法(reduction),用一個Stream的reduce方法,将所有從pipeline産生的元素都合并在一起,或者預先打包像min、max、count、和sum這類方法。驟死式操作(short-circuiting operation)如 anyMach、allMatch和noneMatch也都可以并行。由Stream的collect方法執行的操作,都是可變的減法,不是并行的的最好選擇,因為合并集合的成本非常高。

如果是自己編寫Stream 、Iterable或者Collection實作,并且想要得到适當的并行性能,就必須覆寫spliterator方法,并廣泛地測試結果Stream 的并行性能。編寫高品質的分隔疊代器很困難,并且超出了本書的讨論範疇。

并行Stream 不僅可能降低性能,包括活性失敗,還可能導緻結果出錯,以及難以預計的行為(如安全性失敗)。安全性失敗可能是因為并行的pipeline使用了映射、過濾器或者程式員自己編寫的其他函數對象,并且沒有遵守他們的規範。Stream 規範對于這些函數對象有着嚴格的要求條件。例如,傳到Stream的reduce操作的收集器函數群組合器函數,必須是有關聯、互不幹擾,并且是無狀态的。如果不滿足這些條件(在46條中提到了一些),但是按序列運作pipeline,可能會得到正确的結果;如果并發運作,則可能會突發性失敗。

以上值得注意的是,并行的梅森素數程式雖然運作完成了,但是并沒有按正确的順序(升序)列印出來。為了儲存序列化版本程式顯示的順序,必須用forEachOrdered代替終止操作的forEach,它可以確定按encounter順序周遊并行的Stream。

假如在使用的是一個可以有效分隔的源Stream,一個可并行的或者簡單的終止操作,以及互不幹擾的函數對象,那麼将無法獲得通過并行實作的提速,除非pipeline完成了足夠的實際工作,抵消了與并行相關的成本。據不完全統計,Stream中的元素數量是每個元素所執行的代碼行數的很多倍,至少是十萬倍[Lea 14]。

切記:并行Stream 是一項嚴格的性能優化。對于任何優化都必須在改變前後對性能進行測試,以確定值得這麼做(item 67)。最理想的是在現實的系統設定中進行測試。一般來說,程式中所有并行Stream pipeline 都是在一個通用的fork-join池中運作得。隻要有一個pipeline 運作異常,都會損害到系統中其他不相關部分的性能。

聽起來貌似在并行Stream pipeline 時怪事連連,其實正是如此。我有個朋友,他發現在大量使用Stream 的幾百萬行代碼中,隻有少數幾個并行Stream 是有效的。這并不意味着應該避免使用并行Stream 。在适當的條件下,給Stream pipeline 添加 parallel 調用,确實可以在多處理器核的情況下實作近乎線性的倍增。某些域如機器學習和資料處理,尤其适用于這樣的提速。

簡單舉一個并行Stream pipeline 有效的例子。假設下面這個函數是用來計算 π(n) ,素數的數量少于或者等于n :

// Prime-counting stream pipeline - benefits from parallelization
    static long pi(long n) {
        return LongStream.rangeClosed(2,n)
                .mapToObj(BigInteger::valueOf)
                .filter(i -> i.isProbablePrime(50))
                .count();
    }           

在我的(作者)機器上,這個函數花31秒完成了計算 π(10^8) 。隻要添加一個 parallel() 調用,就把調用時間減少到了9.2s

// Prime-counting stream pipeline -  parallel version
    static long pi(long n) {
        return LongStream.rangeClosed(2,n)
                .parallel()
                .mapToObj(BigInteger::valueOf)
                .filter(i -> i.isProbablePrime(50))
                .count();
    }           

換句話說,并行計算在我的四核機器上添加了parallel 調用後,速度加快了3.7倍。值得注意的是,這并不是在實踐中計算n值很大時的 π(n) 的方法。還有更加高效的算法,如著名的Lehmer公式。

如果要并行一個随機數的Stream ,應該從SplittableRandom執行個體開始,而不是從ThreadLocalRandom(或實際上已經過時的Random)開始。SplittableRandom正是專門為此設計的,還有線性提速的可能。ThreadLocalRandom 則隻用于單線程,它将自身當做一個并行的Stream 源運用到函數中,但是沒有SplittableRandom 那麼快。Random在每個操作上都進行同步,是以會導緻濫用,扼殺了并行的優勢。

總而言之,盡量不要并行Stream pipeline ,除非有足夠的理由相信它能保證計算的正确性,并且能加快程式的運作速度。如果對Stream 進行不恰當的并行操作,可能導緻程式運作失敗,或者造成性能災難。如果确信并行是可行的,并發運作時一定要確定代碼正确,并在真實環境下認真地進行性能測量。如果代碼正确,這些實驗也證明它有助于提升性能,隻有這時候,才可以在編寫代碼時并行Stream。