流(Stream)的基本介紹

Stream的操作步驟
-
建立Stream
一個資料源(如:集合、數組),擷取一個流
-
中間操作
一個中間操作鍊,對資料源的資料進行處理
-
終止操作(終端操作)
一個終止操作,執行中間操作鍊,并産生結果
Java8新特性之Stream的使用
建立Stream
public class Demo01 {
List<Person> people = Arrays.asList(
new Person("A", 24),
new Person("B", 44),
new Person("E", 53),
new Person("C", 53),
new Person("D", 19));
@Test
public void test01() {
// 1. 通過Collection集合提供的stream()
ArrayList<Object> list = new ArrayList<>();
Stream<Object> stream = list.stream();
// 2.通過Arrays中靜态方法擷取數組流
int arr[] = {1, 2, 3};
IntStream stream1 = Arrays.stream(arr);
// 3.通過Stream中的靜态方法
Stream<String> stream2 = Stream.of("aa", "bb", "cc");
// 4.建立無限流
// 疊代
Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2);
stream3.limit(10);
stream3.forEach(System.out::println);
// 生成
Stream.generate(() -> Math.random()).limit(5).forEach(System.out::println);
}
}
中間操作
1.篩選與切片
@Test
public void test02() {
// 内部疊代
// 中間操作:不會執行任何操作
Stream<Person> stream = people.stream().
filter((e) -> e.age > 30);
// 終止操作:一次性執行全部内容
stream.forEach(System.out::println);
}
@Test
public void test03() {
people.stream()
.filter((e) -> e.age > 20)
// 取前兩個
.limit(2)
.forEach(System.out::println);
}
@Test
public void test04() {
people.stream()
.filter((e) -> e.age > 20)
// 跳過前兩個
.skip(2)
.forEach(System.out::println);
}
@Test
public void test05() {
people.stream()
.filter((e) -> e.age > 20)
.skip(2)
// 去重
.distinct()
.forEach(System.out::println);
}
2.映射
@Test
public void test06() {
List<String> list = Arrays.asList("aaa", "bbb", "ccc");
list.stream()
.map((str) -> str.toUpperCase())
.forEach(System.out::println);
System.out.println("提取Person名字");
people.stream().map(Person::getName)
.forEach(System.out::println);
}
3.排序
@Test
public void test07() {
// 排序
List<String> list = Arrays.asList("ddd", "aaa", "bbb", "ccc");
list.stream()
.sorted()
.forEach(System.out::println);
System.out.println("--------------------");
// 年齡一樣按照姓名排
people.stream()
.sorted((e1, e2) -> {
if (e1.age == e2.age) {
return e1.name.compareTo(e2.name);
} else {
return e1.age - e2.age;
}
})
.forEach(System.out::println);
}
終止操作
終止操作會從流水線生成結果。其結果可以是任何不是流的值,例如:List、Integer,甚至是void。
1.查找與比對
@Test
public void test08() {
// 查找與比對
boolean b = people.stream()
// 是否比對所有元素
.allMatch((e) -> e.name.equals("B"));
System.out.println(b);
System.out.println("----------------");
// 至少有一個比對
boolean b1 = people.stream().anyMatch((e) -> e.name.equals("B"));
System.out.println(b1);
System.out.println("----------------");
// 是否沒有比對所有元素
boolean b2 = people.stream().noneMatch((e) -> e.name.equals("B"));
System.out.println(b2);
System.out.println("-------------------------");
// 找出第一個,年齡最小的,最終結果有可能為空時
Optional<Person> first = people.stream().sorted(Comparator.comparing(e -> e.age))
.findFirst();
System.out.println(first.get());
System.out.println("-------------------------");
Optional<Person> optional = people.stream()
.filter((e) -> e.age > 30)
.findAny();
System.out.println(optional.get());
}
@Test
public void test09() {
long count = people.stream().count();
System.out.println(count);
// 擷取年齡最大
Optional<Person> max = people.stream().max(Comparator.comparingInt(e -> e.age));
System.out.println(max.get());
System.out.println("----------------------");
// 擷取最小名字
Optional<String> min = people.stream()
.map(Person::getName)
.min(String::compareTo);
System.out.println(min.get());
}
2.歸約
@Test
public void test10() {
// 歸約
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Integer reduce = list.stream()
.reduce(0, (x, y) -> x + y);
System.out.println(reduce);
System.out.println("---------------");
// 年齡總和
Optional<Integer> op = people.stream()
.map(Person::getAge)
.reduce(Integer::sum);
System.out.println("年齡總和:" + op.get());
}
3.收集
Collector接口中方法的實作決定了如何對流執行收集操作(如收集到List、Set、Map)。但是Collectors實用類提供了很多靜态方法,可以友善地建立常見收集器執行個體,具體方法與執行個體如下表:
@Test
public void test11() {
// 收集
// 提取所有姓名
List<String> list = people.stream()
.map(Person::getName)
.collect(Collectors.toList());
System.out.println(list);
System.out.println("----------------");
HashSet<String> set = people.stream()
.map(Person::getName)
.collect(Collectors.toCollection(HashSet::new));
set.forEach(System.out::println);
}
@Test
public void test12() {
// 收集
Long set = people.stream()
.collect(Collectors.counting());
System.out.println(set);
// 年齡平均值
Double age = people.stream()
.collect(Collectors.averagingInt(Person::getAge));
System.out.println(age);
System.out.println("-------------------------");
IntSummaryStatistics agesum = people.stream()
.collect(Collectors.summarizingInt(Person::getAge));
System.out.println(agesum);
System.out.println("-------------------------");
// 擷取年齡字元最大
Optional<Person> name = people.stream().max(Comparator.comparing(e -> e.name));
System.out.println(name.get());
}
4.分組、分區
@Test
public void test13() {
// 分組
// 按照相同年齡分組
Map<Integer, List<Person>> map = people.stream()
.collect(Collectors.groupingBy(Person::getAge));
map.entrySet().forEach(System.out::println);
}
@Test
public void test14() {
// 多級分組
Map<Integer, Map<String, List<Person>>> map = people.stream()
.collect(Collectors.groupingBy(Person::getAge, Collectors.groupingBy((e) -> {
if (e.getAge() <= 35) {
return "青年";
} else {
return "中年";
}
})));
map.entrySet().forEach(System.out::println);
}
@Test
public void test15() {
// 分區
// 滿足條件一個區
Map<Boolean, List<Person>> map = people.stream()
.collect(Collectors.partitioningBy((e) -> e.age > 35));
map.entrySet().forEach(System.out::println);
}
@Test
public void test16() {
// 分區
// 滿足條件一個區
IntSummaryStatistics statistics = people.stream()
.collect(Collectors.summarizingInt(Person::getAge));
System.out.println(statistics.getAverage());
System.out.println(statistics.getCount());
System.out.println(statistics.getMax());
}
@Test
public void test17() {
// 分區
// 滿足條件一個區
String s = people.stream()
.map(Person::getName)
// 中間 首部 尾部
.collect(Collectors.joining(",", "<", ">"));
System.out.println(s);
}
并行流
并行流就是把一個内容分成多個資料塊,并用不同的線程分别處理每個資料塊的流。
Fork/Join 架構介紹
Fork/Join 架構:就是在必要的情況下,将一個大任務,進形拆分(fork)成若幹個小任務(拆到不可再拆時),再将一個個的小任務運作的結果進行join彙總。
Fork/Join 架構與傳統線程池的差別:
采用“工作竊取”模式(work-stealing):
當執行新的任務時,它可以将其拆分成更小的任務執行,并将小任務加到線程隊列中,然後再從一個随機線程的隊列中偷一個并把它放在自己的隊列中。
相對于一般的線程池實作,fork/join架構的優勢展現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由于某些原因無法繼續運作,那麼該線程會處于等待狀态.而在fork/join架構實作中,如果某個子問題由于等待另外一個子問題的完成而無法繼續運作.那麼處理該子問題的線程會主動尋找其他尚未運作的子問題來執行.這種方式減少了線程的等待時間,提高了性能。
public class ForkJoinDemo extends RecursiveTask<Long> {
private static final long serialVersionUID = 13465648909L;
private long start;
private long end;
private static final long THRESHOLD = 10000;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
public static void main(String args[]) {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0, 10000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println(Duration.between(start, end).toMillis());
}
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long mid = (start + end) / 2;
ForkJoinDemo left = new ForkJoinDemo(start, mid);
left.fork();
ForkJoinDemo right = new ForkJoinDemo(mid + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
Java8并行流
Java 8 中将并行進行了優化,我們可以很容易的對資料進行并行操作。Stream API 可以聲明性地通過 parallel() 與 sequential() 在并行流與順序流之間進行切換。
@Test
public void test20() {
// 并行流
long l = LongStream.rangeClosed(0, 10000000L)
.parallel()
.reduce(0, Long::sum);
System.out.println(l);
}
Stream練習
@Test
public void test18() {
// 給定數字清單
// 傳回每個數字清單的平方清單
Integer[] integers = new Integer[]{1, 2, 3, 4, 5};
Arrays.stream(integers)
.map((x) -> x * x)
.forEach((i) -> System.out.print(i + " "));
}
@Test
public void test19() {
// 擷取people集合中數目
Optional<Integer> optional = people.stream()
.map((e) -> 1)
.reduce(Integer::sum);
System.out.println(optional.get());
}