天天看點

Node.js 中流操作實踐

類似于 EventEmitter,專注于 IO 管道中事件驅動的資料處理方式;類比于數組或者映射,Stream 也是資料的集合,隻不過其代表了不一定正在記憶體中的資料。。Node.js 的 Stream 分為以下類型:

Readable Stream: 可讀流,資料的産生者,譬如 process.stdin

Writable Stream: 可寫流,資料的消費者,譬如 process.stdout 或者 process.stderr

Duplex Stream: 雙向流,即可讀也可寫

Transform Stream: 轉化流,資料的轉化者

Stream 本身提供了一套接口規範,很多 Node.js 中的内模組化塊都遵循了該規範,譬如著名的 ​<code>​fs​</code>​ 子產品,即是使用 Stream 接口來進行檔案讀寫;同樣的,每個 HTTP 請求是可讀流,而 HTTP 響應則是可寫流。

Node.js 中流操作實踐

當我們建立某個可讀流時,其還并未開始進行資料流動;添加了 data 的事件監聽器,它才會變成流動态的。在這之後,它就會讀取一小塊資料,然後傳到我們的回調函數裡面。 ​<code>​data​</code>​ 事件的觸發頻次同樣是由實作者決定,譬如在進行檔案讀取時,可能每行都會觸發一次;而在 HTTP 請求處理時,可能數 KB 的資料才會觸發一次。可以參考 ​​nodejs/readable-stream/_stream_readable​​ 中的相關實作,發現 on 函數會觸發 resume 方法,該方法又會調用 flow 函數進行流讀取:

我們還可以監聽 ​<code>​readable​</code>​ 事件,然後手動地進行資料讀取:

Readable Stream 還包括如下常用的方法:

Readable.pause(): 這個方法會暫停流的流動。換句話說就是它不會再觸發 data 事件。

Readable.resume(): 這個方法和上面的相反,會讓暫停流恢複流動。

Readable.unpipe(): 這個方法會把目的地移除。如果有參數傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。

在日常開發中,我們可以用 ​​stream-wormhole​​ 來模拟消耗可讀流:

當 ​<code>​end()​</code>​ 被調用時,所有資料會被寫入,然後流會觸發一個 ​<code>​finish​</code>​ 事件。注意在調用 ​<code>​end()​</code>​ 之後,你就不能再往可寫流中寫入資料了。

Writable Stream 中同樣包含一些與 Readable Stream 相關的重要事件:

error: 在寫入或連結發生錯誤時觸發

pipe: 當可讀流連結到可寫流時,這個事件會觸發

unpipe: 在可讀流調用 unpipe 時會觸發

多個管道順序調用,即是建構了連結(Chaining):

管道也常用于 Web 伺服器中的檔案處理,以 Egg.js 中的應用為例,我們可以從 Context 中擷取到檔案流并将其傳入到可寫檔案流中:

可知在典型的流處理場景中,我們不可以避免地要處理所謂的背壓(Backpressure)問題。無論是 Writable Stream 還是 Readable Stream,實際上都是将資料存儲在内部的 Buffer 中,可以通過 ​<code>​writable.writableBuffer​</code>​​ 或者 ​<code>​readable.readableBuffer​</code>​​ 來讀取。當要處理的資料存儲超過了 ​<code>​highWaterMark​</code>​​ 或者目前寫入流處于繁忙狀态時,write 函數都會傳回 ​<code>​false​</code>​​。​<code>​pipe​</code>​ 函數即會自動地幫我們啟用背壓機制:

當 Node.js 的流機制監測到 write 函數傳回了 ​<code>​false​</code>​,背壓系統會自動介入;其會暫停目前 Readable Stream 的資料傳遞操作,直到消費者準備完畢。

Duplex Stream 可以看做讀寫流的聚合體,其包含了互相獨立、擁有獨立内部緩存的兩個讀寫流, 讀取與寫入操作也可以異步進行:

我們可以使用 Duplex 模拟簡單的套接字操作:

在開發中我們也經常需要直接将某個可讀流輸出到可寫流中,此時也可以在其中引入 PassThrough,以友善進行額外地監聽:

Transform Stream 則是實作了 ​<code>​_transform​</code>​ 方法的 Duplex Stream,其在兼具讀寫功能的同時,還可以對流進行轉換:

這裡我們實作簡單的 Base64 編碼器:

繼續閱讀