Java8開始,在java.util.stream包下引入了Stream流及IntStream、LongStream、DoubleStream等數值流,這些流式接口支援順序和并行的聚合操作一系列元素。從原始資料的流動方向角度來分析,可以将流式操作分為三個階段:建構流、中間操作、終端操作,這些階段的操作在Java8提供了許多API。按照這個思路,整理并總結下Stream 相關的API。
目錄
一、建構流
二、中間操作
三、終端操作
1、周遊操作
2、歸集操作
3、組合操作
小結
一、建構流
比如Java集合進行流轉換,轉換方式有兩種:.stream()表示串行流,.parallelStream()表示并行流,串行流是使用一個線程順序的處理資料,而并行流則使用多個線程進行并發的處理資料操作。
Stream API提供的流建構方式有:
- Stream接口的靜态方式 —— empty()、concat()、of()、iterate()、generate()、Stream.builder()等;
- Stream接口的中間操作方式 —— filter()、distinct()、limit()、skip()、sorted()、peek()、map()、mapToXxx()、flatMap()、flatMapToXxx()等;
- 其他接口或類的方式 —— 如Arrays類的stream()、Files類lines()、Collection接口stream() 或parallelStream()等。
public class XxxTest {
/** 測試 Stream接口的靜态方式 和 其他接口或類的stream()方法 */
public static void main(String[] args) throws IOException {
// 傳回一個空的順序Stream
Stream<Object> emptyStream = Stream.empty();
// 傳回其元素是指定值的順序排序Stream
Stream<String> stringStream = Stream.of("a", "b", "c");
// 傳回疊代流(起始元素為1,每次+2遞增)
Stream<Double> iterateStream = Stream.iterate(1.0, n -> n + 2.0);
// 随機生成無限流
Stream<Double> generateStream = Stream.generate(Math::random);
// 建立一個懶惰連接配接的流,其元素是第一個流的所有元素,後跟第二個流的所有元素
Stream<Double> concatStream = Stream.concat(iterateStream, generateStream);
// 建立可變建構器,通過add()方法将元素加入Stream
Stream.Builder<String> builder = Stream.builder();
builder.add("weixiangxiang");
builder.add("xxwei3");
// 通過檔案類生成Stream
Stream<String> filesStream = Files.lines(Paths.get("data.txt"));
// 通過數組類生成Stream
DoubleStream doubleStream = Arrays.stream(new double[]{3.14, 0.89, 7.56, 3.25});
// 通過集合接口生成Stream
List<String> list = Arrays.asList("習友路", "翡翠路", "習友路", "創新大道", "南北一号高架", "天柱路");
Stream<String> listStream = list.stream(); // 順序流
Stream<String> listParallelStream= list.parallelStream(); // 并行流
}
}
二、中間操作
在上面提及到了許多stream流的中間操作方法API,以示例中的List集合為例來 一 一 分析:
filter(Predicate<? super T> predicate)
- 含義:傳回由與此給定謂詞比對的此流的元素組成的流,換句話說 —— 傳回一個根據給定條件過濾的元素組成的流;
- 示例:Stream<String> filterStream = listStream.filter(str -> str.contains("路")); 。
distinct()
- 含義:傳回由該流的不同元素(根據 Object.equals(Object) )組成的流,換句話說 —— 去重;
- 示例:Stream<String> distinctStream = listStream.distinct(); 。
limit(long maxSize) 與 skip(long n)
- 含義:簡單的說 —— limit()選擇前maxSize個元素,skip()跳過前n個元素,二者搭配常用于分頁;
- 示例:以項目中分頁資料處理場景為例 。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL4gzM5MTMxITMwMjMxAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
sorted() 與 sorted(Comparator<? super T> comparator)
- 含義:無參,傳回由此流的元素組成的流,根據自然順序排序。有參,傳回由該流的元素組成的流,根據提供的 Comparator進行排序;
- 示例:Stream<String> sortedStream = llistStream.sorted(); 。
peek(Consumer<? super T> action)
- 含義:傳回由該流的元素組成的流,另外在從生成的流中消耗元素時對每個元素執行提供的操作,換句話說 —— 改變流中元素的狀态 或維持元素的現狀,與map()方法功能相似;
- 示例:Stream<String> peekStream = listStream.peek(System.out::println); // 比如隻列印不做任何操作 。
map(Function<? super T,? extends R> mapper)
- 含義:傳回由給定函數應用于此流的元素的結果組成的流,換句話說 —— 将流中的每一個元素T映射為元素R;
- 示例:Stream<String> mapStream = listStream.map(str -> str.concat("-1")); 。
mapToXxx(ToXxxFunction<? super T,? extends R> mapper)
- 含義:傳回一個 XxxStream ,其中包含将給定函數應用于此流的元素的結果, Xxx有int、long、double類型;
- 用于映射不同類型元素。
flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
- 含義:傳回由通過将提供的映射函數應用于每個元素而産生的映射流的内容來替換該流的每個元素的結果的流,換句話說 —— 将流中的每一個元素T映射為一個流SR;
- 示例:
public class XxxTest {
public static void main(String[] args) throws IOException {
// flatMap方式
List<String> flatMapStream1 = listStream
.flatMap(s ->Arrays.stream(s.split(";"))).collect(Collectors.toList());
// map + flatMap方式
List<String> flatMapStream2 = listStream
.map(s -> s.split(";")).flatMap(Arrays::stream).collect(Collectors.toList());
}
}
flatMapToXxx(Function<? super T,? extends Stream<? extends R>> mapper)
- 含義:傳回一個 IntStream ,其中包含将該流的每個元素替換為通過将提供的映射函數應用于每個元素而産生的映射流的内容的結果, Xxx有int、long、double類型;
- 用于映射不同類型的流。
此外,需要注意還有兩類方法:findXxx()、xxxMatch()
- findXxx():尋找流中的元素,如果有多個相同的元素時,findAny傳回一個不一定是第一個,findFirst尋找第一個;
- xxxMatch():流中的元素是否比對滿足條件(all-所有元素,any-任意一個,none-無)。
public class XxxTest {
public static void main(String[] args) throws IOException {
// findXxx
Optional<Integer> any = integerList.stream().filter(a -> a == 88).findAny();
Optional<Integer> first = integerList.stream().filter(a -> a == 88).findFirst();
logger.info("orElse方法賦預設值,避免使用get方法出現空指針異常:" + any.orElse(null));
// xxxMatch
boolean allMatch = integerList.stream().allMatch(a -> a > 0);
boolean anyMatch = integerList.stream().anyMatch(a -> a == 89);
boolean noneMatch = integerList.stream().noneMatch(a -> a > 90);
}
}
小結一下:
以上總結的API是生成Stream流的中間操作方法,而中間操作都有着一個共同點 —— 惰性!也就是說自Stream流建構起,經過中間操作對元素進行處理後,如果沒有後續的終端操作的話,任何的中間操作都不會産生預期的結果,例如Stream<String> peekStream = listStream.peek(System.out::println); ,這段代碼不會按照預期列印流中的每個元素。
對于常見的資料處理:如求和、求均值、求最值、統計元素數量等,demo示範下Stream流是如何操作的:(暫不考慮使用Collectors接口的方法實作,Collectors接口方法的用法後面再說~)
public class XxxTest {
public static void main(String[] args) throws IOException {
List<Milk> milkList = new ArrayList<>();
milkList.add(new Milk("0", "伊利老酸奶", "mn10001", 5.0));
milkList.add(new Milk("1", "蒙牛純牛奶", "mn10001", 4.2));
milkList.add(new Milk("2", "蒙牛兒童奶", "mn10002", 6.0));
milkList.add(new Milk("3", "伊利豆奶", "mn10002", 4.0));
// count()
logger.info(String.valueOf(milkList.stream().count()));
// max()
Optional<Milk> maxOptional = milkList.stream().max(Comparator.comparing(Milk::getMilkPrice));
maxOptional.ifPresent(x -> System.out.println(x));
// min()
Optional<Milk> minOptional = milkList.stream().min(Comparator.comparingDouble(Milk::getMilkPrice));
minOptional.ifPresent(x -> System.out.println(x.getMilkPrice()));
// mapToXxx() + sum()
double sum3 = milkList.stream().mapToDouble(Milk::getMilkPrice).sum();
// map() + reduce(Xxx帶有的sum())
Double sum4 = milkList.stream().map(Milk::getMilkPrice).reduce(Double::sum).get();
// mapToXxx() + average()
OptionalDouble a2 = milkList.stream().mapToDouble(Milk::getMilkPrice).average();
// XxxStream 用數值流求和、求最值,求均值等
DoubleStream doubleStreams = milkList.stream().mapToDouble(Milk::getMilkPrice);
logger.info("求和:" + doubleStreams.sum());
// 數值流隻能使用一次,先求和再求其他的,會報錯:stream has already been operated upon or closed
// logger.info("求均值:" + doubleStreams.average());
// logger.info("最大值:" + doubleStreams.max());
// logger.info("最小值:" + doubleStreams.min());
// 對應IntStream 與 LongStream 擁有 range 和 rangeClosed 方法用于數值範圍處理
int intSum = IntStream.range(1, 100).sum();
double intAverage = IntStream.range(1, 100).average().getAsDouble();
}
}
class Milk implements Serializable {
public String milkId;
public String milkName;
public String milkNbr;
public Double milkPrice;
public Milk() {
}
public Milk(String milkId, String milkName, String milkNbr, Double milkPrice) {
this.milkId = milkId;
this.milkName = milkName;
this.milkNbr = milkNbr;
this.milkPrice = milkPrice;
}
/** 省略get/set/toString方法... */
}
三、終端操作
所謂終端操作就是流的結尾操作,常見的終端操作有:周遊操作foreach()、歸集操作collect()、組合操作reduce()等。
1、周遊操作
- forEach(Consumer<? super T> action) :對此流的每個元素執行操作。
- forEachOrdered(Consumer<? super T> action) :如果流具有定義的遇到順序,則以流的遇到順序對該流的每個元素執行操作。
最常用的則是第一種操作,demo示範如下:
public class XxxTest {
public static void main(String[] args) throws IOException {
List<String> list = Arrays.asList("翡翠路", "習友路", "創新大道", "南北一号高架", "天柱路");
Stream<String> listStream = list.stream();
Stream<String> filterStream = listStream.filter(str -> str.contains("路"));
filterStream.forEach(str -> System.out.println(str));
// 使用鍊式
list.stream().filter(str -> str.contains("路")).forEach(str -> System.out.println(str));
}
}
2、歸集操作
- collect(Collector<? super T,A,R> collector):使用 Collectors接口對此流的元素執行歸集操作。
- collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner) :對此流的元素執行歸集操作。
最常用的是第一種操作,其中Collectors接口靜态方法有:(這些API非常重要!!!Xxx代表不同基本類型,有int、double、long)
- 轉集合或映射操作:toList()、toSet()、toMap()、toConcurrentMap()、toCollection()、collectingAndThen等;
- 字元串連接配接操作:joining();
- 分組操作:有簡單分組、多級分組、分組求和 ,分組後傳回Map類型,key為分組類别,groupingBy()、summingXxx()等;
- 分區操作:按照true和false來分的,partitioningBy();
- 統計流中元素數量:counting();
- 求最值操作:maxBy()、minBy();
- 求和操作:summingXxx()、summarizingXxx() + getSum() 等;
- 求均值操作:averagingXxx();
- 其他:mapping()、reduce()等
這些重要API的使用demo示範如下:
public class XxxTest {
public static void main(String[] args) throws IOException {
List<String> list = Arrays.asList("翡翠路", "習友路", "創新大道", "南北一号高架", "天柱路");
// 預設轉成ArrayList
List<String> newList = list.stream().map(str -> str.concat("-1")).collect(Collectors.toList());
// 預設轉成hashSet
Set<String> newSet = list.stream().map(str -> str.concat("-1")).collect(Collectors.toSet());
List<Milk> milkList = new ArrayList<>();
milkList.add(new Milk("0", "伊利老酸奶", "mn10001", 5.0));
milkList.add(new Milk("1", "蒙牛純牛奶", "mn10001", 4.2));
milkList.add(new Milk("2", "蒙牛兒童奶", "mn10002", 6.0));
milkList.add(new Milk("3", "伊利豆奶", "mn10002", 4.0));
// milkList.add(new Milk("3", "伊利豆奶", "mn10002", 4.0)); //去重測試使用
// 指定k-v
Map<String, Milk> mapCollect1 = milkList.stream()
.collect(Collectors.toConcurrentMap(Milk::getMilkId, Milk -> Milk));
// 指定k-v
Map<String, Milk> mapCollect2 = milkList.stream()
.collect(Collectors.toMap(Milk::getMilkId, Function.identity()));
// 為了防止key重複,傳入一個合并的函數來解決key沖突問題
Map<String, Milk> mapCollect3 = milkList.stream()
.collect(Collectors.toMap(Milk::getMilkId, Function.identity(), (k1, k2) -> k1));
// 拼接多個參數作為key
Map<String, Milk> mapCollect4 = milkList.stream()
.collect(Collectors.toMap(k -> k.getMilkId() + k.getMilkNbr(), Function.identity(), (k1, k2) -> k1));
// 字元串連接配接 joining
logger.info(milkList.stream().map(Milk::getMilkName).collect(Collectors.joining()));
logger.info(milkList.stream().map(Milk::getMilkName).collect(Collectors.joining(",")));
logger.info(milkList.stream().map(Milk::getMilkName).collect(Collectors.joining("和","勇哥,",",xxw")));
// 分組 groupingBy
Map<Double, List<Milk>> simpleMap = milkList.stream().collect(Collectors.groupingBy(Milk::getMilkPrice));
simpleMap.forEach((k, v) -> logger.info(k + ":" + v));
Map<String, Map<Double, List<Milk>>> moreMap = milkList.stream()
.collect(Collectors.groupingBy(Milk::getMilkNbr, Collectors.groupingBy(Milk::getMilkPrice)));
moreMap.forEach((k, v) -> logger.info(k + ":" + v));
Map<String, Double> sumMap = milkList.stream()
.collect(Collectors.groupingBy(Milk::getMilkNbr, Collectors.summingDouble(Milk::getMilkPrice)));
sumMap.forEach((k, v) -> logger.info(k + ":" + v));
// 分區:按照true和false來分的,如Milk按價格分區 ,支援多級分區 partitioningBy
Map<Boolean, List<Milk>> booleanListMap = milkList.stream()
.collect(Collectors.partitioningBy(p -> p.getMilkPrice() >= 5.0));
//false:[...] true:[...]
booleanListMap.forEach((k, v) -> logger.info(k + ":" + v));
Map<Boolean, Map<Boolean, List<Milk>>> moreBooleanListMap = milkList.stream()
.collect(Collectors.partitioningBy(p -> p.getMilkPrice() >= 5.0, Collectors
.partitioningBy(x -> x.getMilkName().startsWith("蒙牛"))));
//false:{false=[...], true=[...]} true:{false=[...], true=[...]}
moreBooleanListMap.forEach((k, v) -> logger.info(k + ":" + v));
// 去重方法:id唯一+distinct()、id唯一+TreeSet
List<String> distincts = milkList.stream().map(Milk::getMilkId).distinct().collect(Collectors.toList());
List<Milk> sets = milkList.stream().collect(Collectors.collectingAndThen(Collectors
.toCollection(() -> new TreeSet<>(Comparator.comparing(Milk::getMilkId))), ArrayList::new));
sets.forEach(x -> logger.info(String.valueOf(x)));
// counting
logger.info(String.valueOf(milkList.stream().collect(Collectors.counting())));
// summarizingXxx
double sum1 = milkList.stream().collect(Collectors.summarizingDouble(Milk::getMilkPrice)).getSum();
// summingXxx
Double sum2 = milkList.stream().collect(Collectors.summingDouble(Milk::getMilkPrice));
// averagingXxx
Double a1 = milkList.stream().collect(Collectors.averagingDouble(Milk::getMilkPrice));
// maxBy
Optional<Milk> maxByOptional = milkList.stream()
.collect(Collectors.maxBy(Comparator.comparing(Milk::getMilkPrice)));
maxByOptional.ifPresent(System.out::println);
// minBy
Optional<Milk> minByOptional = milkList.stream()
.collect(Collectors.minBy(Comparator.comparingDouble(Milk::getMilkPrice)));
System.out.println(minByOptional.get().getMilkPrice());
}
}
3、組合操作
- reduce(BinaryOperator<T> accumulator) :
- reduce(T identity, BinaryOperator<T> accumulator)
- reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
組合操作可以簡單了解為:對流中的元素按照一定規則進行組合,生成需要的組合結果,如使用reduce()将流元素組合起來進行求和,求最值,求平均等操作,同時需要注意串行計算和并行計算的不同,用demo示範說明:
public class XxxTest {
public static void main(String[] args) throws IOException {
List<Milk> milkList = new ArrayList<>();
milkList.add(new Milk("0", "伊利老酸奶", "mn10001", 5.0));
milkList.add(new Milk("1", "蒙牛純牛奶", "mn10001", 4.2));
milkList.add(new Milk("2", "蒙牛兒童奶", "mn10002", 6.0));
milkList.add(new Milk("3", "伊利豆奶", "mn10002", 4.0));
Optional<Double> reduce1 = milkList.stream().map(Milk::getMilkPrice).reduce((a, b) -> a + b);
Optional<Double> reduce2 = milkList.stream().map(Milk::getMilkPrice).reduce(Double::max);
logger.info("求和:" + reduce1.get() + ",最值為:" + reduce2.get()); //23.0 6.0
Double reduce3 = milkList.stream().map(Milk::getMilkPrice).reduce(0.0, (a, b) -> a + b);
Double reduce4 = milkList.parallelStream().map(Milk::getMilkPrice).reduce(0.0, (a, b) -> a + b);
logger.info("指定初始值0,串行的求和:" + reduce3 + ",并行求和:" + reduce4); // 23.0 23.0
Double reduce5 = milkList.stream().map(Milk::getMilkPrice).reduce(10.0, (a, b) -> a + b);
Double reduce6 = milkList.parallelStream().map(Milk::getMilkPrice).reduce(10.0, (a, b) -> a + b);
// 注意并行計算時,每個線程的初始累加值都是10,最後5個線程加出來的結果就是73.0
logger.info("指定非0初始值,串行的求和:" + reduce5 + ",并行求和:" + reduce6); // 33.0 73.0
Double reduce7 = milkList.stream().map(Milk::getMilkPrice).reduce(10.0, (a, b) -> a + b, Double::sum);
Double reduce8 = milkList.parallelStream().map(Milk::getMilkPrice).reduce(10.0, (a, b) -> a + b, Double::sum);
logger.info("指定非0初始值,串行的求和:" + reduce7 + ",并行求和:" + reduce8);// 33.0 73.0
}
}
小結
至此,按照原始資料在Stream流中的流動方向整理和分析了如何建構流、如何進行相關的中間操作及如何進行相關的終端操作,在這些過程中涉及的API基本都是Java8新增的,學習和使用它們是Java開發必備的基礎。當然,我還發現一個有趣的現象,比如進行常見的求值運算,Stream API提供的實作方式就有很多種,還有專門的數值流可以操作,裡面或多或少存在一些細節問題(如Stream流可以反複使用而數值流隻能使用一次、調用reduce()進行并行計算和串行計算得到的結果不同 等等),需要在實際中使用後才能感受得到。
經過這一通整理和分析花費了不少時間,但也加深了對Stream API的了解,還是那句話吧,不積矽步無以至千裡,點滴付出終将有所收獲,共同進步吧 ~