雙工流就是同時實作了 Readable 和 Writable 的流,即可以作為上遊生産資料,又可以作為下遊消費資料,這樣可以處于資料流動管道的中間部分,即
rs.pipe(rws1).pipe(rws2).pipe(rws3).pipe(ws);
在 NodeJS 中雙工流常用的有兩種
- Duplex
- Transform
實作 Duplex
和 Readable、Writable 實作方法類似,實作 Duplex 流非常簡單,但 Duplex 同時實作了 Readable 和 Writable, NodeJS 不支援多繼承,是以我們需要繼承 Duplex 類
- 繼承 Duplex 類
- 實作 _read() 方法
- 實作 _write() 方法
相信大家對 read()、write() 方法的實作不會陌生,因為和 Readable、Writable 完全一樣。
const Duplex = require('stream').Duplex;
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
}
});
構造函數參數
Duplex 執行個體内同時包含可讀流和可寫流,在執行個體化 Duplex 類的時候可以傳遞幾個參數
- readableObjectMode : 可讀流是否設定為 ObjectMode,預設 false
- writableObjectMode : 可寫流是否設定為 ObjectMode,預設 false
- allowHalfOpen : 預設 true, 設定成 false 的話,當寫入端結束的時,流會自動的結束讀取端,反之亦然。
小例子
了解了 Readable 和 Writable 之後看 Duplex 非常簡單,直接用一個官網的例子
const Duplex = require('stream').Duplex;
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
當然這是不能執行的僞代碼,但是 Duplex 的作用可見一斑,進可以生産資料,又可以消費資料,是以才可以處于資料流動管道的中間環節,常見的 Duplex 流有
- Tcp Scoket
- Zlib
- Crypto
Transform 同樣是雙工流,看起來和 Duplex 重複了,但兩者有一個重要的差別:Duplex 雖然同僚具備可讀流和可寫流,但兩者是相對獨立的;Transform 的可讀流的資料會經過一定的處理過程自動進入可寫流。
雖然會從可讀流進入可寫流,但并不意味這兩者的資料量相同,上面說的一定的處理邏輯會決定如果 tranform 可讀流,然後放入可寫流,transform 原義即為轉變,很貼切的描述了 Transform 流作用。
我們最常見的壓縮、解壓縮用的 zlib 即為 Transform 流,壓縮、解壓前後的資料量明顯不同,兒流的作用就是輸入一個 zip 包,輸入一個解壓檔案或反過來。我們平時用的大部分雙工流都是 Transform。
實作 Tranform
Tranform 類内部繼承了 Duplex 并實作了 writable.write() 和 readable._read() 方法,我們想自定義一個 Transform 流,隻需要
- 繼承 Transform 類
- 實作 _transform() 方法
- 實作 _flush() 方法(可以不實作)
_transform(chunk, encoding, callback) 方法用來接收資料,并産生輸出,參數我們已經很熟悉了,和 Writable 一樣, chunk 預設是 Buffer,除非 decodeStrings 被設定為 false。
在 _transform() 方法内部可以調用 this.push(data) 生産資料,交給可寫流,也可以不調用,意味着輸入不會産生輸出。
當資料處理完了必須調用 callback(err, data) ,第一個參數用于傳遞錯誤資訊,第二個參數可以省略,如果被傳入了,效果和 this.push(data) 一樣
transform.prototype._transform = function (data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data);
};
有些時候,transform 操作可能需要在流的最後多寫入可寫流一些資料。例如, Zlib流會存儲一些内部狀态,以便優化壓縮輸出。在這種情況下,可以使用_flush()方法,它會在所有寫入資料被消費、觸發 'end'之前被調用。
Transform 事件
Transform 流有兩個常用的事件
- 來自 Writable 的 finish
- 來自 Readable 的 end
當調用 transform.end() 并且資料被 _transform() 處理完後會觸發 finish,調用_flush後,所有的資料輸出完畢,觸發end事件。
對比
了解了 Readable 和 Writable 之後,了解雙工流十分自然,但兩者的差別會讓一些初學者困惑,簡單的區分:Duplex 的可讀流和可寫流之間并沒有直接關系,Transform 中可讀流的資料會經過處理後自動放入可寫流中。
看兩個簡單的例子就能直覺了解到 Duplex 和 Transform 的差別
TCP socket
net 子產品可以用來建立 socket,socket 在 NodeJS 中是一個典型的 Duplex,看一個 TCP 用戶端的例子
var net = require('net');
//建立用戶端
var client = net.connect({port: 1234}, function() {
console.log('已連接配接到伺服器');
client.write('Hi!');
});
//data事件監聽。收到資料後,斷開連接配接
client.on('data', function(data) {
console.log(data.toString());
client.end();
});
//end事件監聽,斷開連接配接時會被觸發
client.on('end', function() {
console.log('已與伺服器斷開連接配接');
});
可以看到 client 就是一個 Duplex,可寫流用于向伺服器發送消息,可讀流用于接受伺服器消息,兩個流内的資料并沒有直接的關系。
gulp
gulp 非常擅長處理代碼本地建構流程,看一段官網的示例代碼
gulp.src('client/templates/*.jade')
.pipe(jade())
.pipe(minify())
.pipe(gulp.dest('build/minified_templates'));
其中 jada() 和 minify() 就是典型的 Transform,處理流程大概是
.jade 模闆檔案 -> jada() -> html 檔案 -> minify -> 壓縮後的 html
可以看出來,jade() 和 minify() 都是對輸入資料做了些特殊處理,然後交給了輸出資料。
這樣簡單的對比就能看出 Duplex 和 Transform 的差別,在平時實用的時候,當一個流同僚面向生産者和消費者服務的時候我們會選擇 Duplex,當隻是對資料做一些轉換工作的時候我們便會選擇使用 Tranform。