Java 8 的新特性Stream,以及Java7的Fork/Join架構
文章目錄
- Java 8 的新特性Stream,以及Java7的Fork/Join架構
-
- 一、流
- 二、流的操作
-
- 2.1 篩選、去重、截斷、跳過
- 2.2 映射:map、flatMap
- 2.3 查找和比對
- 2.4 歸約 reduce
- 2.5 數值流
- 2.6 建立流
- 三、流的收集器
-
- 3.1 `collect`、`Collector`、`Collectors` 的差別
- 3.2 歸約和彙總
- 3.3 分組
- 3.4 分區
- 3.5 收集器接口
-
- (1) `Collector` 接口
- (2) 使用stream.collect(supplier, accumulator, combiner) 的重載方法
- 3.6 并行資料處理與性能高
-
- (1)并行流與串行流的轉化
- (2)預設并行流線程池:`ForkJoinPool`
- (3) 并行流的反例
-
- 反例一:使用不易并行化的操作導緻,性能變差
- 反例二:使用共享變量導緻結果錯誤
- (4)何時使用并行流
- 3.7 分支/合并架構
-
- (1)建立`RecursiveTask ` 或`RecursiveAction`的子類
- (2)調用`new ForkJoinPool().invoke()`
- (3)使用分支合并架構的注意事項
- 3.8 Spliterator 可分疊代器
- 其他、Optional和OptionalInt
一、流
- 流像一個延遲建立的集合
- 流隻能消費一次
- 流利用了内部疊代
二、流的操作
流的操作分為:中間操作、終端操作。
流的延遲特性:中間操作可以連成一個操作流水線,除非發出一個終端操作,中間操作不會執行任何處理。
利用延遲特性:短路、循環合并。
2.1 篩選、去重、截斷、跳過
-
filter()
-
根據 hashCode() 和 equals() 去除distinct()
-
截斷limit()
-
,跳過前面n個元素,和limit() 截然相反skip()
2.2 映射:map、flatMap
- map 映射: 根據一個元素,建立出一個新的元素
- flatMap 扁平化:把一個流中的每一個值都換成另一個流,然後把所有流連接配接起來成為一個流。
String[] words = {"hello", "world"};
// [h, e, l, l, o, w, o, r, l, d]
List<String> charList = Arrays.stream(words).map(item -> item.split("")).flatMap(Arrays::stream).collect(Collectors.toList());
題目:
給定兩個數字清單,如何傳回所有的數對呢?例如,給定清單[1, 2, 3]和清單[3, 4],應
該傳回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]。為簡單起見,你可以用有兩個元素的數組來代
表數對。
Integer[][] aa = {{1, 2, 3},{3, 4}};
List<Integer[]> pairs = Arrays.stream(aa[0])
.flatMap(i -> Arrays.stream(aa[1])
.map(j -> new Integer[]{i, j})
).collect(Collectors.toList());
2.3 查找和比對
比對
-
檢查是否至少比對一個元素,傳回booleananyMatch()
-
檢查是否比對所有元素,傳回booleanallMatch()
-
流中沒有任何元素與給定的判斷比對noneMath()
查找
-
傳回任意一個findAny()
-
查找第一個元素findFirst()
2.4 歸約 reduce
求和:
List<Integer> a = new ArrayList<>(Arrays.asList(3, 4, 5, 6, 8));
Integer res = a.stream().reduce(0, (i, j) -> i + j);
Integer res1 = a.stream().reduce(0, Integer::sum);
Optional<Integer> res2 = a.stream().reduce(Integer::sum);
System.out.println(res2.orElse(0));
最大值和最小值:
Optional<Integer> maxOptional = a.stream().reduce(Integer::max);
System.out.println(maxOptional.orElse(0));
Optional<Integer> minOptional = a.stream().reduce(Integer::min);
System.out.println(minOptional.orElse(0));
使用流中内置的方法統計個數:
long count = a.stream().count();
2.5 數值流
- 原始類型流特化:避免裝箱成本。IntStream、DoubleStream、LongStream。
- 映射為數值流: mapToInt() ,傳回一個IntStream
- 轉回對象流:intStream.boxed()
- 數值範圍:通過數值流, intStream.range() 和 intStream.rangeClosed()
2.6 建立流
- 由值建立流
Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);
- 由數組建立流
Stream<Integer> stream1 = Arrays.stream(new Integer[]{3, 5, 6, 7});
- 由檔案生成流
// 由檔案生成流 String filePath ="pom.xml"; try (Stream<String> fileStream = Files.lines(Paths.get(filePath), Charset.defaultCharset())){ long count = fileStream.flatMap(line -> Arrays.stream(line.split(" "))).count(); System.out.println(count); } catch (IOException e) { e.printStackTrace(); }
- 由函數生成流:建立無限流
-
Stream.iterate()
-
Stream.generate()
generate 生成// 列印前十個偶數 Stream.iterate(0, n->n+2).limit(10).forEach(System.out::println); // 斐波那契數列 Stream.iterate(new Integer[]{0, 1}, a->new Integer[]{a[1], a[0]+a[1]}) .limit(10).map(a->a[0]).forEach(k-> System.out.print(k+ " "));
Stream.generate(Math::random).limit(5).forEach(System.out::println); // generate 産生斐波那契序列 IntStream.generate(new IntSupplier() { private int pre = 0; private int cur = 1; @Override public int getAsInt() { int oldPre = this.pre; int nexVal = this.pre + this.cur; this.pre = this.cur; this.cur = nexVal; return oldPre; } }).limit(10).forEach(k-> System.out.print(k+ " "));
-
三、流的收集器
3.1 collect
、 Collector
、 Collectors
的差別
collect
Collector
Collectors
-
觸發歸約操作collect()
-
是接口。其方法的實作決定了如何對流進行歸約Collector
-
提供了很多靜态方法,可以很友善的建立常見的收集器;Collectors
3.2 歸約和彙總
-
統計數量Collectors.counting()
Long count = list.stream().collect(Collectors.counting()); long count1 = list.stream().count();
-
和Collectors.maxBy()
最大值和最小值Collectors.minBy()
Optional<String> maxOpt = list.stream().collect(Collectors.maxBy(String::compareTo)); Optional<String> minOpt = list.stream().collect(Collectors.minBy(String::compareTo));
- 彙總,求和、求平均
-
Collectors.summingInt()
-
Collectors.summingDouble()
-
Collectors.summingLong()
-
Collectors.averagingInt()
-
Collectors.averagingDouble()
-
Collectors.averagingLong()
Integer sum = Arrays.stream(a).collect(Collectors.summingInt(Integer::intValue)); Double avg = Arrays.stream(a).collect(Collectors.averagingInt(Integer::intValue));
-
- 同時獲得:最大值、最小值、平均值、和
-
Collectors.summarizingInt()
-
Collectors.summarizingDouble()
-
Collectors.summarizingLong()
IntSummaryStatistics summaryStatistics = Arrays.stream(a).collect(Collectors.summarizingInt(Integer::intValue));
-
- 連接配接字元:
Collectors.joining()
String res = list.stream().collect(Collectors.joining()); String res2 = list.stream().collect(Collectors.joining(","));
- 廣義的歸約彙總:
Collectors.reducing()
3.3 分組
-
Collectors.groupingBy()
Map<Integer, List<Integer>> collect = Arrays.asList(a).stream().collect(Collectors.groupingBy(Integer::intValue)); Map<String, List<Integer>> collect2 = Arrays.asList(a).stream().collect(Collectors.groupingBy(item -> { if (item > 3) { return "aa"; } else { return "bb"; } }));
-
Collectors.groupingBy(Dish::getName(), Collectors.groupingBy(Dish::getType()))
- 傳遞給第一個groupingBy() 的第二個收集器可以是任何類型,而不一定是groupingBy()
-
Collectors.groupingBy(Dish::getName(), Collectors.counting())
Map<String, Long> collect1 = Arrays.asList(a).stream().collect(Collectors.groupingBy(item -> { if (item > 3) { return "aa"; } else { return "bb"; } }, Collectors.counting()));
-
-
将收集器的結果轉換成另外一種結果collectingAndThen
Map<String, Dish> map1 = dishList.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(Dish::getName)), Optional::get)));
3.4 分區
-
分組的特殊情況,将資料分為 true和false兩組。Collectors.partitioningBy()
// 判斷一個數值是否為質數
private static boolean isPrime(int n) {
return IntStream.range(2, n).noneMatch(v-> n % v==0);
}
// 判斷一個數值是否為質數 優化
private static boolean isPrime2(int n) {
int sqrt = (int)Math.sqrt(n);
return IntStream.rangeClosed(2, sqrt).noneMatch(v-> n % v==0);
}
3.5 收集器接口
(1) Collector
接口
Collector
Collector<T,A,R>
: T 流的類型,A 為累加器類型,R為收集操作得到的類型;
-
: 建立新的結果容器; 建立空的累加器執行個體,供資料收集過程中使用;supplier()
-
: 累加器accumulator()
-
: 将累加器對象轉換成為整個集合操作的最終結果;finisher()
-
: 合并兩個結果容器combiner
-
: 傳回一個不可變得Characterististics集合,定義了收集器的行為characterististics
- UNORDERED 歸約結果不受流中項目的周遊和累積順序的影響;
- CONCURRENT 并行歸約流,它僅在用于無序資料源時才可以并行歸約;
- IDENTIFY_FINISH 表明直接将累加器的結果作為最終結果
第一個示例:ToListCollector
第二個案例:擷取質數 , 及對比 對比兩種擷取質數的方法
(2) 使用stream.collect(supplier, accumulator, combiner) 的重載方法
ArrayList<String> res = list.stream().collect(ArrayList::new, List::add, List::addAll);
3.6 并行資料處理與性能高
(1)并行流與串行流的轉化
parallel
, 将串行流轉化成并行流;
sequential
,将并行流轉化成并行流;
parallel和sequential可以結合使用,在内部實際上是一個boolean辨別,最後一次的parallel或sequential調用會影響整個流水線。
(2)預設并行流線程池: ForkJoinPool
ForkJoinPool
- 預設線程數的數量就是處理器數量:
availableProcessors 看起來是處理器,實際上傳回的是可用核心的數量,包括超線程生成的虛拟核心。Runtime.getRuntime().availableProcessors()
- 可以通過系統屬性,進行全局設定線程數:(沒有好的理由,強烈不建議修改)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");
(3) 并行流的反例
反例一:使用不易并行化的操作導緻,性能變差
在下面的案例中,在實測中發現,此處的并行流比串行流慢很多。如果采用不易并行化的操作,可能會讓程式的整體性能變得更差。
// 串行流
public long sequentialSum2(long n) {
return LongStream.iterate(1l, i->i+1l).limit(n).reduce(0l, Long::sum);
}
// 并行流
public long parallelSum(long n) {
return LongStream.iterate(1l, i->i+1l).limit(n)
.parallel()
.reduce(0l, Long::sum);
}
将上面案例改為使用 range , 将得到并行的效率高于串行:
// 串行
public long sequentialSum3(long n) {
return LongStream.rangeClosed(1l, n).limit(n).reduce(0l, Long::sum);
}
// 并行
public long parallelSum3(long n) {
return LongStream.rangeClosed(1l, n).limit(n)
.parallel()
.reduce(0l, Long::sum);
}
反例二:使用共享變量導緻結果錯誤
class Accumulator {
public long total = 0;
public void add(long value) {
total += value;
}
}
public long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1l, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
(4)何時使用并行流
- 留意裝箱;
- 需要流中的n個元素,而不是前n個;
- 估算:N個元素,處理每個元素的成本為Q,Q越大,并行處理的性能好的可能行更大;
- 對于較小的資料量,不建議使用并行流;
- 考慮資料結構是否更容易分解;
- 考慮并行流處理的合并時的成本;
可分解性:
資料結構 | 可分解性 |
---|---|
ArrayList | 極好 |
LinkedList | 極差 |
IntStream.range | 極好 |
Stream.iterator | 極好 |
HashSet | 好 |
TreeSet | 好 |
3.7 分支/合并架構
它是ExecutorService 接口的實作。把子任務配置設定給線程池(ForkJoinPool)。
分治算法的并行版本。
(1)建立 RecursiveTask<R>
或 RecursiveAction
的子類
RecursiveTask<R>
RecursiveAction
隻需要實作其唯一的抽象方法
compute
:
- 定義将任務拆分為子任務的邏輯;
- 以及無法再拆分時,生成單個子任務結果的邏輯;
if (任務足夠小或不可分){
順序計算該任務
}else {
将任務分成兩個子任務
遞歸調用本方法,拆分每個子任務,等待所有子任務完成
合并每個子任務的結果
}
(2)調用 new ForkJoinPool().invoke()
new ForkJoinPool().invoke()
在實際應用中,使用多個ForkJoinPool是沒有意義的。一般來說,将其執行個體化一次,然後把執行個體儲存在靜态字段中,使之成為單例。
一個ForkJoinPool的案例:MyRecursiveSumTask
(3)使用分支合并架構的注意事項
-
方法會阻塞調用方,是以有必要在兩個子任務的就算都開始之後再調用它;join()
- 不應該在
内部調用RecursiveTask
的ForkJoinPool()
方法,應該始終調用invoke
或fork()
方法compute()
- 任務調用
方法可以把它排進fork()
, 左右兩個任務同時調用ForkJoinPool()
的效率 比 讓其中一個任務調用fork()
的效率低,這樣做可以為其中一個子任務重用同一線程,避免線上程池多配置設定一個任務造成開銷;compute()
- 分支/合并架構需要“預熱”或者說要執行幾遍才會被JIT編譯器優化;
-
劃分成許多小任務而不是大任務,更有助于工作線程的負載平衡;
這其中使用了“工作竊取算法”:每個線程都有雙端隊列(儲存配置設定給該線程的任務),某一個線程任務隊列空了,會去其他任意一個線程任務隊列中的末尾取一個任務。
3.8 Spliterator 可分疊代器
案例,建立一個切分字元串的Spliterator:MyWordCountSpliterator
其他、Optional和OptionalInt
-
對象流傳回值OPtional
-
數值流傳回值OptionalInt