天天看点

effective java 3-第7章 lambda和stream[48]谨慎使用Stream并行

作者:CC挑灯夜读1谷

在主流编程语言中,Java一直走在简化并发编程任务的最前沿。1996年Java发布时,就通过同步和wait/notify内置了对线程的支持。Java5 引入了java.util.concurrent类库,提供了并行集包(concurrent collection)和执行者框架(executor framework) 。Java7引入了 fork-join包,这是一个处理并行分解的高性能框架。Java8引入了Stream,只需要调用一次parallel方法就可以实现并行处理。在Java中编写并发程序变得越来越容易,但是要编写出正确又快速的并发程序,则一向没那么简单。安全性和活性失败是并发编程中需要面对的问题,Stream pipeline并行也不例外。

请看摘自45条的这段程序:

public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE) )
                .filter(mersenne -> mersenne.isProbablePrime(50))
                .limit(20)
                .forEach(System.out::println);

    }

    static Stream<BigInteger> primes(){
        return Stream.iterate( TWO,BigInteger::nextProbablePrime);
    }           

在我的机器上,这段程序会立即开始打印素数,玩完成运行花了12.5秒。假设我天真地想通过在Stream pipeline 上添加一个parallel() 调用来提速。你认为这样会对其性能产生什么样的影响呢?运行速度会稍微快一点点吗?还是会慢一点点?遗憾的是,其结果是根本不打印任何内容了,CPU的使用率却定在90%一动不动了(活性失败)。程序最后可能会终止,但是我不想一探究竟,半个小时后就强行把它终止了。

这是怎么回事呢?简单地说,Stream类库不知道如何并行这个pipeline,以及如何探索失败。即便在最佳环境下,如果源头是来自于Stream.iterator,或者使用了中间操作的limit,那么并行pipeline 也不可能提升性能。这个pipeline 必须同时满足这两个条件。更糟糕的是,默认的并行策略在处理limit 的不可预知性时,是假设额外多处理几个元素,并放弃任何不需要的结果,这些都不会影响性能。在这种情况下,它查找梅森素数时,所花费的时间大概是查找之前元素的两倍。因而,额外多计算一个元素的成本,大概相当于计算所有之前元素总和的时间,这个貌似无伤大雅的pipeline,却使得自动并行算法濒临崩溃。这个故事的寓意很简单:千万不要任意地并行Stream pipeline。它造成的性能后果有可能是灾难性的。

总之,在Stream 上通过并行获得的性能,最好是通过ArrayList、HashMap、HashSet 和ConcurrentHashMap 实例,数组,int 范围和long 范围等。这些数据结果的共性是,都可以被精确、轻松地分成任意大小的子范围,使并行线程中的分工变得更加轻松。Stream 类库用来执行这个任务的抽象是分割迭代器(spliterator),它是由Stream 和 Iterable 中的 spliterator方法返回。

这些数据结构共有e另一项重要特性是,在进行顺序处理时,它们提供了优异的引用局部性(locality of reference):序列化的元素引用一起保存在内存中。被那些引用访问到的对象在内存中可能不是一个紧挨着一个,这降低了引用局部性。事实证明,引用局部性对于并行批处理来说至关重要:没有它,线程就会出现闲置,需要等待数据从内存转移到处理器的缓存。具有最佳引用局部性的数据结构是基本类型数据数组,因为数据本身是相邻地保存在内存中的。

Stream pipeline 的终止操作本质上也影响了并发执行的效率。如果大量的工作在终止操作中完成,而不是全部工作在pipeline中完成,并且这个操作是固有的顺序,那么并行pipeline的效率就会受到限制。并行的最佳终止操作是做减法(reduction),用一个Stream的reduce方法,将所有从pipeline产生的元素都合并在一起,或者预先打包像min、max、count、和sum这类方法。骤死式操作(short-circuiting operation)如 anyMach、allMatch和noneMatch也都可以并行。由Stream的collect方法执行的操作,都是可变的减法,不是并行的的最好选择,因为合并集合的成本非常高。

如果是自己编写Stream 、Iterable或者Collection实现,并且想要得到适当的并行性能,就必须覆盖spliterator方法,并广泛地测试结果Stream 的并行性能。编写高质量的分隔迭代器很困难,并且超出了本书的讨论范畴。

并行Stream 不仅可能降低性能,包括活性失败,还可能导致结果出错,以及难以预计的行为(如安全性失败)。安全性失败可能是因为并行的pipeline使用了映射、过滤器或者程序员自己编写的其他函数对象,并且没有遵守他们的规范。Stream 规范对于这些函数对象有着严格的要求条件。例如,传到Stream的reduce操作的收集器函数和组合器函数,必须是有关联、互不干扰,并且是无状态的。如果不满足这些条件(在46条中提到了一些),但是按序列运行pipeline,可能会得到正确的结果;如果并发运行,则可能会突发性失败。

以上值得注意的是,并行的梅森素数程序虽然运行完成了,但是并没有按正确的顺序(升序)打印出来。为了保存序列化版本程序显示的顺序,必须用forEachOrdered代替终止操作的forEach,它可以确保按encounter顺序遍历并行的Stream。

假如在使用的是一个可以有效分隔的源Stream,一个可并行的或者简单的终止操作,以及互不干扰的函数对象,那么将无法获得通过并行实现的提速,除非pipeline完成了足够的实际工作,抵消了与并行相关的成本。据不完全统计,Stream中的元素数量是每个元素所执行的代码行数的很多倍,至少是十万倍[Lea 14]。

切记:并行Stream 是一项严格的性能优化。对于任何优化都必须在改变前后对性能进行测试,以确保值得这么做(item 67)。最理想的是在现实的系统设置中进行测试。一般来说,程序中所有并行Stream pipeline 都是在一个通用的fork-join池中运行得。只要有一个pipeline 运行异常,都会损害到系统中其他不相关部分的性能。

听起来貌似在并行Stream pipeline 时怪事连连,其实正是如此。我有个朋友,他发现在大量使用Stream 的几百万行代码中,只有少数几个并行Stream 是有效的。这并不意味着应该避免使用并行Stream 。在适当的条件下,给Stream pipeline 添加 parallel 调用,确实可以在多处理器核的情况下实现近乎线性的倍增。某些域如机器学习和数据处理,尤其适用于这样的提速。

简单举一个并行Stream pipeline 有效的例子。假设下面这个函数是用来计算 π(n) ,素数的数量少于或者等于n :

// Prime-counting stream pipeline - benefits from parallelization
    static long pi(long n) {
        return LongStream.rangeClosed(2,n)
                .mapToObj(BigInteger::valueOf)
                .filter(i -> i.isProbablePrime(50))
                .count();
    }           

在我的(作者)机器上,这个函数花31秒完成了计算 π(10^8) 。只要添加一个 parallel() 调用,就把调用时间减少到了9.2s

// Prime-counting stream pipeline -  parallel version
    static long pi(long n) {
        return LongStream.rangeClosed(2,n)
                .parallel()
                .mapToObj(BigInteger::valueOf)
                .filter(i -> i.isProbablePrime(50))
                .count();
    }           

换句话说,并行计算在我的四核机器上添加了parallel 调用后,速度加快了3.7倍。值得注意的是,这并不是在实践中计算n值很大时的 π(n) 的方法。还有更加高效的算法,如著名的Lehmer公式。

如果要并行一个随机数的Stream ,应该从SplittableRandom实例开始,而不是从ThreadLocalRandom(或实际上已经过时的Random)开始。SplittableRandom正是专门为此设计的,还有线性提速的可能。ThreadLocalRandom 则只用于单线程,它将自身当做一个并行的Stream 源运用到函数中,但是没有SplittableRandom 那么快。Random在每个操作上都进行同步,因此会导致滥用,扼杀了并行的优势。

总而言之,尽量不要并行Stream pipeline ,除非有足够的理由相信它能保证计算的正确性,并且能加快程序的运行速度。如果对Stream 进行不恰当的并行操作,可能导致程序运行失败,或者造成性能灾难。如果确信并行是可行的,并发运行时一定要确保代码正确,并在真实环境下认真地进行性能测量。如果代码正确,这些实验也证明它有助于提升性能,只有这时候,才可以在编写代码时并行Stream。