類似于 EventEmitter,專注于 IO 管道中事件驅動的資料處理方式;類比于數組或者映射,Stream 也是資料的集合,隻不過其代表了不一定正在記憶體中的資料。。Node.js 的 Stream 分為以下類型:
Readable Stream: 可讀流,資料的産生者,譬如 process.stdin
Writable Stream: 可寫流,資料的消費者,譬如 process.stdout 或者 process.stderr
Duplex Stream: 雙向流,即可讀也可寫
Transform Stream: 轉化流,資料的轉化者
Stream 本身提供了一套接口規範,很多 Node.js 中的内模組化塊都遵循了該規範,譬如著名的 <code>fs</code> 子產品,即是使用 Stream 接口來進行檔案讀寫;同樣的,每個 HTTP 請求是可讀流,而 HTTP 響應則是可寫流。

當我們建立某個可讀流時,其還并未開始進行資料流動;添加了 data 的事件監聽器,它才會變成流動态的。在這之後,它就會讀取一小塊資料,然後傳到我們的回調函數裡面。 <code>data</code> 事件的觸發頻次同樣是由實作者決定,譬如在進行檔案讀取時,可能每行都會觸發一次;而在 HTTP 請求處理時,可能數 KB 的資料才會觸發一次。可以參考 nodejs/readable-stream/_stream_readable 中的相關實作,發現 on 函數會觸發 resume 方法,該方法又會調用 flow 函數進行流讀取:
我們還可以監聽 <code>readable</code> 事件,然後手動地進行資料讀取:
Readable Stream 還包括如下常用的方法:
Readable.pause(): 這個方法會暫停流的流動。換句話說就是它不會再觸發 data 事件。
Readable.resume(): 這個方法和上面的相反,會讓暫停流恢複流動。
Readable.unpipe(): 這個方法會把目的地移除。如果有參數傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。
在日常開發中,我們可以用 stream-wormhole 來模拟消耗可讀流:
當 <code>end()</code> 被調用時,所有資料會被寫入,然後流會觸發一個 <code>finish</code> 事件。注意在調用 <code>end()</code> 之後,你就不能再往可寫流中寫入資料了。
Writable Stream 中同樣包含一些與 Readable Stream 相關的重要事件:
error: 在寫入或連結發生錯誤時觸發
pipe: 當可讀流連結到可寫流時,這個事件會觸發
unpipe: 在可讀流調用 unpipe 時會觸發
多個管道順序調用,即是建構了連結(Chaining):
管道也常用于 Web 伺服器中的檔案處理,以 Egg.js 中的應用為例,我們可以從 Context 中擷取到檔案流并将其傳入到可寫檔案流中:
可知在典型的流處理場景中,我們不可以避免地要處理所謂的背壓(Backpressure)問題。無論是 Writable Stream 還是 Readable Stream,實際上都是将資料存儲在内部的 Buffer 中,可以通過 <code>writable.writableBuffer</code> 或者 <code>readable.readableBuffer</code> 來讀取。當要處理的資料存儲超過了 <code>highWaterMark</code> 或者目前寫入流處于繁忙狀态時,write 函數都會傳回 <code>false</code>。<code>pipe</code> 函數即會自動地幫我們啟用背壓機制:
當 Node.js 的流機制監測到 write 函數傳回了 <code>false</code>,背壓系統會自動介入;其會暫停目前 Readable Stream 的資料傳遞操作,直到消費者準備完畢。
Duplex Stream 可以看做讀寫流的聚合體,其包含了互相獨立、擁有獨立内部緩存的兩個讀寫流, 讀取與寫入操作也可以異步進行:
我們可以使用 Duplex 模拟簡單的套接字操作:
在開發中我們也經常需要直接将某個可讀流輸出到可寫流中,此時也可以在其中引入 PassThrough,以友善進行額外地監聽:
Transform Stream 則是實作了 <code>_transform</code> 方法的 Duplex Stream,其在兼具讀寫功能的同時,還可以對流進行轉換:
這裡我們實作簡單的 Base64 編碼器: