天天看點

極簡 Node.js 入門 - 4.4 可寫流

極簡 Node.js 入門系列教程:https://www.yuque.com/sunluyong/node

本文更佳閱讀體驗:https://www.yuque.com/sunluyong/node/writable

什麼是可寫流

可寫流是對資料流向裝置的抽象,用來消費上遊流過來的資料,通過可寫流程式可以把資料寫入裝置,常見的是本地磁盤檔案或者 TCP、HTTP 等網絡響應。看一個之前用過的例子

process.stdin.pipe(process.stdout);
           

process.stdout 是一個可寫流,程式把可讀流 process.stdin 傳過來的資料寫入的标準輸出裝置。在了解了可讀流的基礎上了解可寫流非常簡單,流就是有方向的資料,其中可讀流是資料源,可寫流是目的地,中間的管道環節是雙向流。

可寫流使用

調用可寫流執行個體的 _write() _方法就可以把資料寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');

rs.setEncoding('utf-8');

rs.on('data', chunk => {
  ws.write(chunk);
});
           

前面提到過監聽了可讀流的 data 事件就會使可讀流進入流動模式,我們在回調事件裡調用了可寫流的 write() 方法,這樣資料就被寫入了可寫流抽象的裝置中,也就是目前目錄下的 copy.js 檔案

write() 方法有三個參數

  • chunk {String| Buffer},表示要寫入的資料
  • encoding 當寫入的資料是字元串的時候可以設定編碼
  • callback 資料被寫入之後的回調函數

自定義可寫流

和自定義可讀流類似,簡單的自定義可寫流隻需要兩步

  1. 繼承 stream 子產品的 Writable 類
  2. 實作 _write() 方法

用個簡單例子示範可寫流實作,把傳入可寫流的資料轉成大寫之後輸出到标準輸出裝置 stdout

const Writable = require('stream').Writable
class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入标準輸出裝置
        process.stdout.write(chunk.toString().toUpperCase());
        // 此處不嚴謹,應該是監聽寫完之後才調用 done
        process.nextTick(done);
    }
}
module.exports = OutputStream;
           

和最終可寫流暴露出來的 write() 方法一樣, _write() 方法有三個參數,作用類似

  • chunk 寫入的資料,大部分時候是 buffer,除非 decodeStrings 被設定為 false
  • encoding 如果資料是字元串,可以設定編碼,buffer 或者 object 模式會忽略
  • callback 資料寫入後的回調函數,可以通知流傳入下一個資料;當出現錯誤的時候也可以設定一個 error 參數

除了在流實作中的 _write() 之外,還可以實作 _writev() 方法,一次處理多個資料塊,這個方法用于被滞留的資料寫入隊列調用,可以不實作

執行個體化可寫流 options

有了可寫流的類之後可以執行個體化使用了,執行個體化可寫流的時候有幾個 option 可選,了解一下接下來要用到的三個核心 options

  • objectMode 預設是 false, 設定成 true 後 writable.write() 方法除了寫入 string 和 buffer 外,還可以寫入任意 JavaScript 對象。很有用的一個選項,後面介紹 transform 流的時候詳細介紹
  • highWaterMark 每次最多寫入的資料量, Buffer 的時候預設值 16kb, objectMode 時預設值 16
  • decodeStrings 是否把傳入的資料轉成 Buffer,預設是 true

這樣就更清楚的知道 _write() 方法傳入的參數的含義了,而且對後面介紹 back pressure 機制的了解很有幫助。

事件

和可讀流一樣,可寫流也有幾個常用的事件,有了可讀流的基礎,了解起來比較簡單

**pipe**

  當可讀流調用 pipe() 方法向可寫流傳輸資料的時候會觸發可寫流的 pipe 事件

**unpipe**

  當可讀流調用 unpipe() 方法移除資料傳遞的時候會觸發可寫流的 unpipe 事件

這兩個事件用于通知可寫流資料将要到來和将要被切斷,在通常情況下使用的很少

writeable.write() 方法是有一個 bool 的傳回值的,前面提到了 highWaterMark,當要求寫入的資料大于可寫流的 highWaterMark 的時候,資料不會被一次寫入,有一部分資料被滞留,這時候 writeable.write() 就會傳回 false,如果可以處理完就會傳回 true

**drain**

 當之前存在滞留資料,也就是 writeable.write() 傳回過 false,經過一段時間的消化,處理完了積壓資料,可以繼續寫入新資料的時候觸發(drain 的本意即為排水、枯竭,挺形象的)

除了 write() 方法可寫流還有一個常用的方法 end(),參數和 write() 方法相同,但也可以不傳入參數,表示沒有其它資料需要寫入,可寫流可以關閉了

**finish**

 當調用 writable.end() 方法,并且所有資料都被寫入底層後會觸發 finish 事件,同樣出現錯誤後會觸發

error

 ** **事件

back pressure

了解了這些事件,結合上之前提到的可讀流的一些知識,就能探讨一些有意思的話題了。前面章節提到過用流相對于直接操作檔案的好處之一是不會把記憶體壓爆,那麼流是怎麼做到的呢?

很容易聯想到流不是一次性把所有資料載入記憶體處理,而是一邊讀一邊寫。但一般資料讀取的速度會遠遠快于寫入的速度,那麼 pipe() 方法是怎麼做到供需平衡的呢?主要靠以下三個要點

  1. 可讀流有流動和暫停兩種模式,可以通過 **pause() 和 resume() **方法切換
  2. 可寫流的 **write() **方法會傳回是否能處理目前的資料,每次可以處理多少是 highWatermark 決定的
  3. 當可寫流處理完了積壓資料會觸發 drain 事件

可以利用這三點來做到資料讀取和寫入的同步,還是使用之前的例子,但為了使消費速度降下來,刻意隔一秒再通知完成

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入标準輸出裝置
        process.stdout.write(chunk.toString().toUpperCase());
        // 故意延緩通知繼續傳遞資料的時間,造成寫入速度慢的現象
        setTimeout(done, 1000);
    }
}
           

使用一下自定義的兩個類

const RandomNumberStream = require('./RandomNumberStream');
const OutputStream = require('./OutputStream');

const rns = new RandomNumberStream(100);
const os = new OutputStream({
    highWaterMark: 8 // 把水位降低,預設16k還是挺大的
});

rns.on('data', chunk => {
    // 當待處理隊列大于 highWaterMark 時傳回 false
    if (os.write(chunk) === false) { 
        console.log('pause');
        rns.pause(); // 暫停資料讀取
    }
});

// 當待處理隊列小于 highWaterMark 時觸發 drain 事件
os.on('drain', () => {
    console.log('drain')
    rns.resume(); // 恢複資料讀取
});
           

結合前面的三點和注釋很容易看懂上面代碼,這就是 pipe() 方法起作用的核心原理,官方教程中也有對 back presure 機制的詳細講解

對資料的來源的去向有了大概了解,就可以學習使用雙向流對資料進行加工了

  • duplex
  • transform