Stream如何提高周遊集合效率?
上一講中,我在講 List 集合類,那我想你一定也知道集合的頂端接口 Collection。在 Java8 中,Collection 新增了兩個流方法,分别是 Stream() 和 parallelStream()。 通過英文名不難猜測,這兩個方法肯定和 Stream 有關,那進一步猜測,是不是和我們熟悉 的 InputStream 和 OutputStream 也有關系呢?集合類中新增的兩個 Stream 方法到底有 什麼作用?今天,我們就來深入了解下 Stream。
什麼是 Stream?
現在很多大資料量系統中都存在分表分庫的情況。例如,電商系統中的訂單表,常常使用使用者 ID 的 Hash 值來實作分表分庫,這樣是為了減 少單個表的資料量,優化使用者查詢訂單的速度。 但在背景管理者稽核訂單時,他們需要将各個資料源的資料查詢到應用層之後進行合并操 作。
例如,當我們需要查詢出過濾條件下的所有訂單,并按照訂單的某個條件進行排序,單個資料源查詢出來的資料是可以按照某個條件進行排序的,但多個資料源查詢出來已經排序好的資料,并不代表合并後是正确的排序,是以我們需要在應用層對合并資料集合重新進行排 序。
在 Java8 之前,我們通常是通過 for 循環或者 Iterator 疊代來重新排序合并資料,又或者通過重新定義 Collections.sorts 的 Comparator 方法來實作,這兩種方式對于大資料量系統來說,效率并不是很理想。
Java8 中添加了一個新的接口類 Stream,他和我們之前接觸的位元組流概念不太一樣, Java8 集合中的 Stream 相當于進階版的 Iterator,他可以通過 Lambda 表達式對集合進 行各種非常便利、高效的聚合操作(Aggregate Operation),或者大批量資料操作 (Bulk Data Operation)。 Stream 的聚合操作與資料庫 SQL 的聚合操作 sorted、filter、map 等類似。
我們在應用 層就可以高效地實作類似資料庫 SQL 的聚合操作了,而在資料操作方面,Stream 不僅可 以通過串行的方式實作資料操作,還可以通過并行的方式處理大批量資料,提高資料的處理 效率。
接下來我們就用一個簡單的例子來體驗下 Stream 的簡潔與強大。 這個 Demo 的需求是過濾分組一所中學裡身高在 160cm 以上的男女同學,我們先用傳統 的疊代方式來實作,代碼如下:

Stream 如何優化周遊?
上面我們初步了解了 Java8 中的 Stream API,那 Stream 是如何做到優化疊代的呢?并行 又是如何實作的?下面我們就透過 Stream 源碼剖析 Stream 的實作原理。
1.Stream 操作分類
在了解 Stream 的實作原理之前,我們先來了解下 Stream 的操作分類,因為他的操作分類 其實是實作高效疊代大資料集合的重要原因之一。為什麼這樣說,分析完你就清楚了。
官方将 Stream 中的操作分為兩大類:中間操作(Intermediate operations)和終結操作 (Terminal operations)。中間操作隻對操作進行了記錄,即隻會傳回一個流,不會進行 計算操作,而終結操作是實作了計算操作。
中間操作又可以分為無狀态(Stateless)與有狀态(Stateful)操作,前者是指元素的處理 不受之前元素的影響,後者是指該操作隻有拿到所有元素之後才能繼續下去。
終結操作又可以分為短路(Short-circuiting)與非短路(Unshort-circuiting)操作,前 者是指遇到某些符合條件的元素就可以得到最終結果,後者是指必須處理完所有元素才能得 到最終結果。操作分類詳情如下圖所示:
2.Stream 源碼實作
在了解 Stream 如何工作之前,我們先來了解下 Stream 包是由哪些主要結構類組合而成的,各個類的職責是什麼。參照下圖:
BaseStream 和 Stream 為最頂端的接口類。BaseStream 主要定義了流的基本接口方法, 例如,spliterator、isParallel 等;
Stream 則定義了一些流的常用操作方法,例如, map、filter 等。 ReferencePipeline 是一個結構類,他通過定義内部類組裝了各種操作流。他定義了 Head、StatelessOp、StatefulOp 三個内部類,實作了 BaseStream 與 Stream 的接口方 法。
Sink 接口是定義每個 Stream 操作之間關系的協定,他包含 begin()、end()、 cancellationRequested()、accpt() 四個方法。ReferencePipeline 最終會将整個 Stream 流操作組裝成一個調用鍊,而這條調用鍊上的各個 Stream 操作的上下關系就是通過 Sink 接口協定來定義實作的。
3.Stream 操作疊加
我們知道,一個 Stream 的各個操作是由處理管道組裝,并統一完成資料處理的。在 JDK 中每次的中斷操作會以使用階段(Stage)命名。
管道結構通常是由 ReferencePipeline 類實作的,前面講解 Stream 包結構時,我提到過 ReferencePipeline 包含了 Head、StatelessOp、StatefulOp 三種内部類。
Head 類主要用來定義資料源操作,在我們初次調用 names.stream() 方法時,會初次加載 Head 對象,此時為加載資料源操作;接着加載的是中間操作,分别為無狀态中間操作 StatelessOp 對象和有狀态操作 StatefulOp 對象,此時的 Stage 并沒有執行,而是通過 AbstractPipeline 生成了一個中間操作 Stage 連結清單;當我們調用終結操作時,會生成一個 最終的 Stage,通過這個 Stage 觸發之前的中間操作,從最後一個 Stage 開始,遞歸産生 一個 Sink 鍊。如下圖所示:
這個例子的需求是查找出一個長度最長,并且以張為姓氏的名字。
從代碼角度來看,你可能 會認為是這樣的操作流程:
首先周遊一次集合,得到以“張”開頭的所有名字;然後周遊一 次 filter 得到的集合,将名字轉換成數字長度;最後再從長度集合中找到最長的那個名字并 且傳回。
這裡我要很明确地告訴你,實際情況并非如此。我們來逐漸分析下這個方法裡所有的操作是 如何執行的。 首先 ,因為 names 是 ArrayList 集合,是以 names.stream() 方法将會調用集合類基礎接 口 Collection 的 Stream 方法:
再調用 filter 和 map 方法,這兩個方法都是無狀态的中間操作,是以執行 filter 和 map 操作時,并沒有進行任何的操作,而是分别建立了一個 Stage 來辨別使用者的每一次操作。
而通常情況下 Stream 的操作又需要一個回調函數,是以一個完整的 Stage 是由資料來源、操作、回調函數組成的三元組來表示。如下圖所示,分别是 ReferencePipeline 的 filter 方法和 map 方法:
new StatelessOp 将會調用父類 AbstractPipeline 的構造函數,這個構造函數将前後的 Stage 聯系起來,生成一個 Stage 連結清單: