關于作者:藍邦珏,騰訊前端工程師,15年加入騰訊SNG增值産品部,期間主要負責過手Q閱讀、手Q動漫項目的業務開發。業餘喜歡折騰前端新技術和寫文章。
作為前端,我們常常會和 Stream 有着頻繁的接觸。比如使用 gulp 對項目進行建構的時候,我們會使用 gulp.src 接口将比對到的檔案轉為 stream(流)的形式,再通過 .pipe() 接口對其進行鍊式加工處理;
或者比如我們通過 http 子產品建立一個 HTTP 服務:
此處的 req 和 res 也屬于 Stream 的消費接口(前者為 Readable Stream,後者為 Writable Stream)。
事實上像上述的 req/res,或者 process.stdout 等接口都屬于 Stream 的執行個體,是以較少存在情況,是需要我們手動引入 Stream 子產品的,例如:
如果不太能讀懂上述代碼,或者對 Stream 的概念感到模糊,那麼可以放輕松,因為本文會進一步地對 Stream 進行剖析,并且談談直接使用它可能會存在的一些問題(這也是為何 gulp 要使用 through2 的原因)。
另外本文的示例均可在我的 github 倉庫(https://github.com/VaJoy/stream/) 擷取到,讀者可以自行下載下傳和調試。
在介紹 Stream(流)之前,我們先來看一個例子 —— 模拟伺服器把本地某個檔案内容吐給用戶端:
這段代碼雖然可以正常執行,但存在一個顯著的問題 —— 對于每一個用戶端的請求,fs.readFile 接口都會把整個檔案都緩存到記憶體中去,然後才開始把資料吐給使用者。那麼當檔案體積很大、請求也較多(且特别當請求來自慢速使用者)的時候,伺服器需要消耗很大的記憶體,導緻性能低下。
然而這個問題,則正是 stream 發揮所長的地方。如前文提及的,res 是流對象,那我們正好可以将其利用起來:
在上方代碼段裡,fs.createReadStream 建立了 data.txt 的可讀流(Readable Stream)。這裡需要事先了解的是,流可以簡單地分為“可讀的(readable)”、“可寫的(writable)”,或者“讀寫均可”三種類型,且所有的流都屬于 EventEmitter 的執行個體。
回到代碼,對于建立的可讀流,我們通過 .pipe() 接口來監聽其 data 和 end 事件,并把 data.txt (的可讀流)拆分成一小塊一小塊的資料(chunks),像流水一樣源源不斷地吐給用戶端,而不再需要等待整個檔案都加載到記憶體後才發送資料。
其中 .pipe 可以視為流的“管道/通道”方法,任何類型的流都會有這個 .pipe 方法去成對處理流的輸入與輸出。
為了友善了解,我們把上述兩種方式(不使用流/使用流)處理為如下的情景:
⑴ 不使用流:

⑵ 使用流:
由此可以得知,使用流(stream)的形式,可以大大提升響應時間,又能有效減輕伺服器記憶體的壓力。
在上文我們曾提及到,stream 可以按讀寫權限來簡單地分做三類,不過這裡我們再細化下,可以把 stream 歸為如下五個類别:
⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams
其中 Transform Streams 和 Duplex Streams 都屬于即可讀又可寫的流,而最後一個 Classic Streams 是對 Node 古早版本上的 Stream 的一個統稱。我們将照例對其進行逐一介紹。
即可讀流,通過 .pipe 接口可以将其資料傳遞給一個 writable、transform 或者 duplex流:
<code>readableStream.pipe(dst)</code>
常見的 Readable Streams 包括:
用戶端上的 HTTP responses
服務端上的 HTTP requests
fs read streams
zlib streams
crypto streams
TCP sockets
子程序的 stdout 和 stderr
process.stdin
例如在前面 demo2 的代碼段中,我們就使用了 fs.createReadStream 接口來建立了一個 fs read stream:
這裡有個有趣的地方 —— 雖然 Readable Streams 稱為可讀流,但在将其傳入一個消耗對象之前,它都是可寫的:
執行結果:
在這段代碼中,我們通過 readStream.push(data) 的形式往可讀流裡注入資料,并以 readStream.push(null) 來結束可讀流。
不過這種寫法有個弊端 —— 從使用 .push() 将資料注入 readable 流中開始,直到另一個東西(process.stdout)來消耗資料之前,這些資料都會存在緩存中。
這裡有個内置接口 ._read() 可以用來處理這個問題,它是從系統底層開始讀取資料流時才會不斷調用自身,進而減少緩存備援。
我們可以回過頭來看 demo1 的例子:
我們是在 ._read 方法中才使用 readStream.push(data) 往可讀流裡注入資料供下遊消耗(也會流經緩存),進而提升流處理的性能。
這裡也有個小問題 —— 上一句話所提到的“供下遊消耗”,這個下遊通常又會以怎樣的形式來消耗可讀流的呢?
首先,可以使用我們熟悉的 .pipe() 方法将可讀流推送給一個消耗對象(writable、transform 或者 duplex流):
其次,也可以通過監聽可讀流的“data”事件(别忘了文章前面提到的“所有的流都屬于 EventEmitter 的執行個體”)來實作消耗處理 —— 在首次監聽其 data 事件後,readStream 便會持續不斷地調用 _read(),通過觸發 data 事件将資料輸出。當資料全部被消耗時,則觸發 end 事件。
示例:
執行結果為:
這裡需要留意的是,在使用 .push() 往可讀流裡注入資料的代碼段,我們使用了 setTimeout 将其包裹起來,這是為了讓系統能有足夠時間優先處理接收流結束信号的事務。當然你也可以改寫為:
Writable(可寫)流接口是對寫入資料的目标的抽象:
src.pipe(writableStream)
常見的 Writable Streams 包括:
用戶端的 HTTP requests
服務端的 HTTP responses
fs write streams
子程序的 stdin
process.stdout 和 process.stderr
可寫流有兩個重要的方法:
writableStream.write(chunk[, encoding, callback]) —— 往可寫流裡寫入資料;
writableStream.end([chunk, encoding, callback]) —— 停止寫入資料,結束可寫流。在調用 .end() 後,再調用 .write() 方法會産生錯誤。
上方兩方法的 encoding 參數表示編碼字元串(chunk為String時才可以用)。
write 方法的 callback 回調參數會在 chunk 被消費後(從緩存中移除後)被觸發;end 方法的 callback 回調參數則在 Stream 結束時觸發。
另外,如同通過 readable._read() 方法可以處理可讀流,我們可以通過 writable._write(chunk, enc, next) 方法在系統底層處理流寫入的邏輯中,對資料進行處理。
其中參數 chunk 代表寫進來的資料;enc 代表編碼的字元串;next(err) 則是一個回調函數,調用它可以告知消費者進行下一輪的資料流寫入。
執行如下:
Duplex 是雙工的意思,是以很容易猜到 Duplex 流就是既能讀又能寫的一類流,它繼承了 Readable 和 Writable 的接口。
常見的 Duplex Streams 有:
Transform Stream 是在繼承了 Duplex Streams 的基礎上再進行了擴充,它可以把寫入的資料和輸出的資料,通過 ._transform 接口關聯起來。
常見的 Transform Streams 有:
其中的 _transform 是 Transform Streams 的内置方法,所有 Transform Streams 都需要使用該接口來接收輸入和處理輸出,且該方法隻能由子類來調用。
_transform 接口格式如下:
transform._transform(chunk, encoding, callback)
第一個參數表示被轉換(transformed)的資料塊(chunk),除非構造方法 option 參數(可選)傳入了 “decodeString : false”,否則其類型均為 Buffer;
第二個參數用于設定編碼,但隻有當 chunck 為 String 格式(即構造方法傳入 “decodeString : false”參數)的時候才可配置,否則預設為“buffer”;
第三個參數 callback 用于在 chunk 被處理後調用,通知系統進入下一輪 _transform 調用。該回調方法接收兩個可選參數 —— callback([error, data]),其中的 data 參數可以将 chunck 寫入緩存中(供更後面的消費者去消費):
另外 Transform Streams 還有一個 _flush(callback) 内置方法,它會在沒有更多可消耗的資料時、在“end”事件之前被觸發,而且會清空緩存資料并結束 Stream。
該内置方法同樣隻允許由子類來調用,而且執行後,不能再調用 .push 方法。
關于 Transform Streams 的更多細節還可以參考這篇文章,推薦閱讀。
在較早版本的 NodeJS 裡,Stream 的實作相較簡陋,例如上文提及的“Stream.Readable”接口均是從 Node 0.9.4 開始才有,是以我們往往需要對其進行多次封裝擴充才能更好地用來開發。
而 Classic Streams 便是對這種古舊模式的 Stream 接口的統稱。
需要留意的是,隻要往任意一個 stream 注冊一個“data”事件監聽器,它就會自動切換到“classic”模式,并按照舊的 API 去執行。
classic 流可以當作一個帶有 .pipe 接口的事件發射器(event emitter),當它要為消耗者提供資料時會發射“data”事件,當要結束生産資料時,則發射“end”事件。
另外隻有當設定 Stream.readable 為 true 時,.pipe 接口才會将目前流視作可讀流:
另外,Classic readable streams 還有 .pause() 和 .resume() 兩個接口可用于暫停/恢複流的讀取:
對于可讀流來說,push(data) 時,data 的類型隻能是 String 或Buffer,且消耗時 data 事件輸出的資料類型都為 Buffer;
對于可寫流來說,write(data) 時,data 的類型也隻能是 String 或 Buffer,_write(data) 調用時所傳進來的 data 類型都為 Buffer。
不過,為了增強資料類型的靈活性,無論是可讀流或是可寫流,隻需要往其構造函數裡傳入配置參數“{ objectMode: true }”,便可往流裡傳入/擷取任意類型(null除外)的資料:
在前文我們介紹了 classic streams,它屬于陳舊版本的 Node 上的 Stream 接口,可以把它稱為 Streams1。而從 Node 0.10 開始,Stream 新增了系列實用的新接口,可以做更多除了 .pipe() 之外的事情,我們把其歸類為 Streams2(事實上,在 Node 0.11+開始,Stream有些許新的變動,從該版本開始的 Stream 也可稱為 Streams3)。
那麼這裡存在一個問題 —— 那些使用了 Stream1 的項目(特别是 npm 包),想更新使用環境的 Node 版本到 0.10+,會否導緻相容問題呢?
還好 Streams2 雖然改頭換面,但本質上是設計為向後相容的。
打個比方,如果你同時推送了一條 Streams2 流和一條舊格式的、基于事件發射器的流,Stream2 将降級為舊模式(shim mode)來向後相容。
但是,如果我們的開發環境使用的是 Node 0.8(且因為某些原因不能更新),但又想使用 Streams2 的API怎麼辦呢?或者比如 npm 上的某些開源的工具包,想要擁抱 Streams2 的便利,又想保持對使用 Node 0.8 的使用者進行相容處理,這樣又得怎麼處理?
針對上述問題,早在 Node 0.10 釋放之前,Issacs 就把 Node-core 中操作 Stream 的核心接口獨立拷貝了一份出來,開源到了 npm 上并持續更新,它就是 readable-stream。
通過使用 readable-stream,我們就可以在那些核心裡沒有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:
var Readable = require('stream').Readable || require('readable-stream').Readable
readable-stream 現在有 v1.0.x 和 v1.1.x 兩個主要版本,前者跟進 Streams2 的疊代,後者跟進 Streams3 的疊代,使用者可以根據需求使用對應版本的包。
readable-stream 雖然提供了一個 Streams 的相容方案,但我們也希望能對 Stream 複雜的API進行精簡。
而 through2 便基于 readable-stream 對 Stream 接口進行了封裝,并提供了更簡單和靈活的方法。
through2 會為你生成 Transform Streams(貌似舊版本是 Duplex Streams)來處理任意你想使用的流 —— 如前文介紹,相比其它流,Transform 流處理起資料會更加靈活友善。
來看下 through2 的示例:
使用 through2.obj 接口操作 Object Mode 下的流:
對比原生的 Stream API,through2 簡潔了不少,加上有 readable-stream 依賴加持,也很好了解為何像 gulp 及其插件都會使用 through2 來操作和處理 stream 了。
以上是本文對 Stream 的一個介紹,但事實上 Stream 還有許多未露面的 API,感興趣的同學可以直接閱讀官方 API文檔做進一步了解。
共勉~
Reference
⑴ Stream API Doc - https://nodejs.org/api/stream.html
⑵ stream-handbook - https://github.com/substack/stream-handbook
⑶ Node.js Stream - 基礎篇 - http://www.cnblogs.com/zapple/p/5759670.html
⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html