天天看點

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

譯者: 翻譯的死月 | 

轉自: 死月的紅魔館程式設計之地 | 原文位址: 

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

Node.js streams(流)因其晦澀難懂以及難以使用而聞名。不過讀了這篇文章之後,這些都難不倒你了。

這幾年,很多工程師都開發了一些為了使 stream 更易用的包。而這篇文章将聚焦于官方的 Node.js streams 文檔。

Stream 是 Node.js 中最好的但又最被大家所誤解東西。

—— Dominic Tarr

流(Stream)到底是什麼?

流就是一系列的資料——就跟數組或者字元串一樣。有一點不同,就是 stream 可能無法在一次性全部可用,且它們不需要與記憶體完全合槽。這麼一來,stream 在處理大量資料,或者操作一個一次隻給出一部分資料的資料源的時候顯得格外有用。

其實,流不隻是在操作大量資料的時候有用。它還為在代碼中使用各種強大的組合類功能提供能力。例如,我們在 Linux 指令行中可以通過管道(pipe)來完成一些組合性的指令,在 Node.js 的流中也能實作。

~/learn-node $ grep -R exports * | wc -l 6

上面的指令行在 Node.js 中可以這麼實作:

const grep = ... // grep 輸出流

const wc = ... // wc 輸入流

grep.pipe(wc)

很多 Node.js 的内置子產品都是基于流接口的:

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

上圖清單中就是一些使用了流的原生 Node.js 對象。其中有一些對象甚至是既可讀又可寫的,例如 TCP socket、zlib 以及 crypto 等。

值得注意的是上面說的一些對象也是彼此緊密聯系的。例如 HTTP 響應在用戶端中是一個可讀流,而在服務端則是一個可寫流。畢竟在 HTTP 場景中,我們在用戶端側是從相應對象(

http.IncommingMessage

)讀取資料,而在服務端則是寫入資料(

http.ServerResponse

)。

還需要注意的是,

stdio

相應的流(

stdin

,

stdout

,

stderr

)在子程序中與主程序都是相反的流類型。這樣一來主程序和子程序直接就可以友善地 pipe

stdio

資料了。

小試牛刀

Talk is cheap, show me the code. 讓我們來看看一個例子,來示範流中的記憶體差異。

首先建立一個大大大大大大大檔案:

const fs = require('fs');

const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {

 file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');

}

file.end();

看看我在建立檔案的時候用了什麼。一個可寫流(Writable stream)!

fs

子產品可以讓你用流來寫入或者讀取檔案。在上面的例子中,我們在一個一百萬次的循環中用一個可寫流寫了一個大檔案

big.file

運作完這段代碼後,你會得到一個将近 400 MB 的檔案。

接下去是一個托管這個

big.file

的 Node.js web 服務端:

const fs = require('fs');

const server = require('http').createServer();

server.on('request', (req, res) => {

 fs.readFile('./big.file', (err, data) => {

   if(err) throw err;

   res.end(data);

 });

});

server.listen(8000);

當服務端進來一個請求,它就會通過

fs.readFile

來異步讀取檔案并傳回。但是,哇,你看它并沒有阻塞住我們的事件循環,或者其它一些東西。真膩害,是還是是還是是呢?

到底是不是呢?我們先來看看服務端到底發生了什麼吧,啟動服務端,然後看看記憶體消耗。

當我運作服務端的時候,它的記憶體消耗很正常,8.7 MB:

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

然後我連接配接到服務端。各部門注意,看看記憶體消耗了:

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

卧槽!一下子就跳到了 434.8 MB!

我們就是簡單地把整個

big.file

檔案的内容放到了記憶體中,然後再把它傳輸給響應對象。真特麼低效。

其實 HTTP 響應對象(也就是上面代碼中的

res

)是一個可寫流。這就意味着如果我們有一個可以代表

big.file

的可讀流,那麼我們隻需要将這兩者管道(pipe)接起來就能得到幾乎一樣的效果,而且根本用不了那麼多記憶體。

Node.js 的

fs

子產品中有一個

createReadStream

方法,可以讓你從任意檔案中建立一個可讀流。我們隻要把這個流與響應對象

pipe

起來就可以了:

const fs = require('fs');

const server = require('http').createServer();

server.on('request', (req, res) => {

 const src = fs.createReadStream('./big.file'); // 就是

 src.pipe(res);                                 // 這兩句

});

server.listen(8000);

現在你再去連接配接伺服器的時候,奇迹出現了(看看記憶體消耗吧):

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

你自己說說看發生了什麼吧。

當一個用戶端請求這個大檔案的時候,我們每次隻傳回一塊内容(chunk),也就是說我們不需要一次性把整個大象放到冰箱裡。記憶體大約隻增長了 25 MB。

你還能把這個樣例給改到極限——把寫檔案的循環改到 500 萬次,這樣一來生成的檔案就超過 2 GB 了,也就是說超過了 Node.js 的預設記憶體上線。

如果這個時候你還是用

fs.readFile

來傳遞檔案的話,預設情況下是做不到的,除非你改 Node.js 的預設記憶體上限。但是如果你用的是

fs.createReadStream

的話,2 GB 的流式檔案傳輸根本不會成為問題,而且記憶體使用量基本上會穩定在很小的量。

那麼,準備好學習流了嗎?

這篇文章是我 Pluralsight 課程中關于 Node.js 的部分内容。我在課程中還提供了相應的視訊教程。

造流 101

Node.js 中的流有 4 種基本類型:Readable(可讀流)、Writable(可讀流)、Duplex(雙工流)和 Transform(變形金剛流)。

  • 可讀流是對于可被消耗的資料源的抽象。例如 

    fs.createReadStream

     方法;
  • 可寫流是對于可被寫入的資料目标的抽象。例如 

    fs.createWriteStream

     方法;
  • 雙工流是可讀流與可寫流的集合體。例如 TCP socket;
  • 變形金剛流基本上就是一個雙工流,隻不過在讀寫的時候可以修改或者轉化資料,例如 

    zlib.createGzip

     就将資料使用 gzip 壓縮了。你可以将變形金剛流看成是一個函數,其中輸入是可寫流,而輸出是一個可讀流。

所有的流都是繼承自

EventEmitter

。也就是說,它們觸發的事件可以用于讀寫資料。不過,我們也可以簡單粗暴地用

pipe

來消費流資料。

pipe 方法

年輕人,你要謹記下面這行魔法:

readableSrc.pipe(writableDest)

這麼簡單一行,我們就将資料源,也就是可讀流的輸出給嫁接到資料目标,也就是可寫流的輸入中去了。資料源必須是一個可讀流,而資料目标得是一個可寫流。當然了,雙工流和變形金剛流既可以是資料源也可以是資料目标。事實上,如果我們把資料嫁接到一個雙工流去,我們就可以像 Linux 一樣進行鍊式調用了:

readableSrc

 .pipe(transformStream1)

 .pipe(transformStream2)

 .pipe(finalWrtitableDest)

pipe

方法會傳回資料目标流,是以我們才能進行鍊式調用。對于

a

(可讀流)、

b

c

(雙工流)、

d

(可寫流)來說,我們就可以用各種姿勢玩:

a.pipe(b).pipe(c).pipe(d)

# 等效于

a.pipe(b)

b.pipe(c)

c.pipe(d)

# 在 Linux下等效于

$ a | b | c | d

pipe

方法是消費流資料最簡單的方法。我的建議是要麼使用

pipe

方法,要麼通過事件來消耗,但是要避免二者混合使用。而且通常,如果你用了

pipe

,你就用不到事件了,但如果你想用更自由的形式來消費流資料,那麼你可能就需要用事件了。

流事件

pipe

在讀取和寫入資料的時候,還會自動做一些其它的管理相關的事情。例如它會處理錯誤、檔案結束符(end-of-files)以及當一個流的流速比另一個流要快或者慢的情況。

不過就算這樣,你也還是可以自行直接使用事件來消費流資料。下面的代碼是一個最簡單的與

pipe

方法等效的樣例:

# readable.pipe(writable)

readable.on('data', (chunk) => {

 writable.write(chunk);

});

readable.on('end', () => {

 writable.end();

});

下面是可讀流和可寫流中一些重要的事件和函數:

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

這些函數和時間密切相關,因為它們通常一起被使用。

在可讀流中,幾個重要的事件分别是:

  • data

     事件,當流中傳出一塊資料給消費者的時候會觸發這個事件;
  • end

     事件,當沒有更多資料了的時候觸發該事件;

在可寫流中,幾個重要的事件分别是:

  • drain

     事件,該事件觸發後就表示可寫流可以寫入資料了;
  • finish

     事件,該事件觸發後表示資料已經寫入到下層系統了。

事件和函數組合起來,可以自定義和優化流的使用。我們可以通過

pipe

/

unpipe

函數來消費可讀流,也可以通過

read

/

unshift

/

resume

等。我們可以将可寫流作為

pipe

/

unpipe

的參數傳入,或者直接調用可寫流的

write

,當寫入結束的時候可以調用

end

函數。

可讀流的暫停與流動模式

可讀流有兩種模式來影響我們消費流:

  • 暫停(Paused)模式;
  • 流動(Flowing)模式。

某種意義上,我們可以将其類比于拉(pull)模式與推(push)模式。

預設情況下,所有的可讀流都是以暫停模式啟動的,但是可以輕松切換為流動模式,然後在需要的時候切回暫停狀态。有時候這個切換會自動執行。

當可讀流處于暫停模式的時候,我們可以通過

read()

函數來按需讀取,但是對于流動模式來說,資料是源源不斷進來的,這時候我們就需要通過監聽來消耗它了。

敲黑闆,在流動模式下,如果沒有消費者去處理這些資料,實際上可能會丢失資料。這就是為什麼當我們的可讀流處于流動模式的時候,我們需要一個事件處理函數去監聽這個事件。實際上,添加一個資料事件監聽函數,就會自動将流動模式切換成暫停模式,删除監聽則會切換回來。這麼做的原因是為了向後相容老的 Node.js 流接口。

如果你要手動切換的話,隻需要使用

resume()

pause()

函數。

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

當使用

pipe

方法去消費一個可讀流的時候,我們不需要關心這些東西,因為

pipe

函數内部自動做了這些相關的處理。

實作流

當我們在談論 Node.js 流的時候,其實主要有兩件事可做:

  • 實作一個流;
  • 消費流。

先前我們隻講了如何消費一個流。接下去讓我們自行動手實作一個吧!

流的實作通常以

require

這個

stream

子產品開始。

實作一個可寫流

我們使用

stream

子產品的

Writable

類來實作一個可讀流:

const{ Writable} = require('stream')

我們可以用好幾種姿勢來實作一個可讀流。例如,我們可以從

Writable

繼承一個類:

class myWritableStream extends Writable{

}

我個人是喜歡更簡單的構造方法。我們隻需要執行個體化一個

Writable

對象,并往裡面傳入幾個參數。這些參數中唯一的必選參數是

write

方法,用于實作寫入一塊(chunk)資料。

const{ Writable} = require('stream');

const outStream = newWritable({

 write(chunk, encoding, callback) {

   console.log(chunk.toString());

   callback();

 }

});

process.stdin.pipe(outStream);

write

函數有三個參數。

  • chunk

     通常是一個 Buffer,除非我們用了别的奇葩姿勢;
  • encoding

     參數指的就是編碼,實際上我們通常可以忽略它;
  • callback

     是我們在寫完資料後需要調用一下的回調函數。它相當于是告知調用方資料寫入成功或者失敗的信标。如果寫入失敗,在調用 

    callback

     函數的時候傳入一個錯誤對象即可。

outStream

中,我們隻是簡單地将一塊資料給

console.log

出來,并緊接着調用回調函數

callback

,不傳入任何錯誤參數,表示寫入成功了。這是一個非常簡單但并沒什麼亂用的

echo

流。它隻是把它接收到的資料原樣輸出而已。

如果要消費這個流,我們隻需要與

process.stdin

這個可讀流一起使用,也就是說隻需要簡單地将

process.stdin

pipe

outStream

當我們運作上面的代碼,我們輸入給

process.stdin

的内容就會被

outStream

又給原封不動地

console.log

出來。

這東西真的沒什麼卵用,畢竟已經内置被實作了。它基本上等價于

process.stdout

。我們可以簡單地将

stdin

給 pipe 到

stdout

,那麼我們将會得到一樣的效果:

process.stdin.pipe(process.stdout);

實作一個可讀流

如果要實作一個可讀流,我們隻需要

require

Readable

,然後在執行個體化的時候傳入一個對象計科。其中我們需要實作

read()

方法:

const{ Readable} = require('stream');

const inStream = newReadable({

 read() {}

});

實作一個可讀流非常簡單。我們隻需要直接往裡

push

待消費資料。

const{ Readable} = require('stream');

const inStream = newReadable({

 read() {}

});

inStream.push('ABCDEFGHIJKLM');

inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

當我們

push

一個

null

對象,就表示我們告知流,沒有更多資料了。

我們可以将該可讀流直接

pipe

process.stdout

來消費這些資料。

當我們執行上面的代碼,我們就會從

inStream

讀取資料并将其輸出到标準輸出中。非常簡單,但還是沒什麼卵用。

我們剛才在

pipe

process.stdout

之前就把所有的資料都推入流中。實際上,更好的方法是按需推資料——當消費者需要的時候再推。我們可以實作

read()

函數:

const inStream = newReadable({

 read(size) {

   // there is a demand on the data... Someone wants to read it.

 }

});

當一個可讀流的

read

方法被調用的時候,我們應該可以把資料推入到一個隊列中。例如,我們可以一次推入一個字母,從 ASCII 65(也就是大寫字母

A

)開始,然後每次都推入下一個字母:

const inStream = newReadable({

 read(size) {

   this.push(String.fromCharCode(this.currentCharCode++));

   if(this.currentCharCode > 90) {

     this.push(null);

   }

 }

});

inStream.currentCharCode = 65;

inStream.pipe(process.stdout);

這樣一來,當有消費者來讀取該可讀流的時候,

read

函數會一直被調用,這樣一來我們就推入了更多的字母。我們再在最後結束這個循環就好了,也就是在

currentCharCode

大于 90(也就是大寫字母

Z

)的時候

push

進去一個

null

這段代碼等效于我們再先前寫的更簡單的那段,隻不過這一次我們是按需推入資料。事實上我們就應該按需推入。

實作雙工 / 變形金剛流

在雙工流中,我們既可以實作可讀流,也可以實作可寫流。其實就相當于我們從兩個流類型中一起繼承出來。

這裡有一個雙工流的樣例,把之前的可讀流和可寫流樣例給結合起來了:

const{ Duplex} = require('stream');

const inoutStream = newDuplex({

 write(chunk, encoding, callback) {

   console.log(chunk.toString());

   callback();

 },

 read(size) {

   this.push(String.fromCharCode(this.currentCharCode++));

   if(this.currentCharCode > 90) {

     this.push(null);

   }

 }

});

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);

糅雜了這些方法後,我們可以用這個雙工流去讀取

A

Z

的字母,然後也可以用做 echo。我們将可讀流

stdin

pipe

到這個雙工流中來進行 echo,然後再将雙工流再給連接配接到可寫流

stdout

中,我們就可以看到

A

Z

的輸出了。

敲黑闆,重點是我們要了解雙工流的讀寫是完全獨立操作的,它隻是将可讀流和可寫流的特征給糅雜到一個對象中。

變形金剛流則更有意思了,它的輸出是經過計算的自身輸入。

對于變形金剛流來說,我們不需要實作

read

或者

write

方法,我們隻需要實作

transform

方法就好了——它是一個糅雜方法。它既有

write

方法的特征,又可以在裡面

push

資料。

這是一個簡單的變形金剛流,它會把流入的資料全部大寫化後再輸出出來:

const{ Transform} = require('stream');

const upperCaseTr = newTransform({

 transform(chunk, encoding, callback) {

   this.push(chunk.toString().toUpperCase());

   callback();

 }

});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

這個變形金剛流中,我們隻實作了

transform()

。在該方法中,我們将

chunk

給轉換成大寫字元串,然後将其

push

給自身可讀流的部分。

流對象模式

預設情況下,流接受 Buffer 和字元串類型的資料。不過有一個

objectMode

參數,我們可以通過設定它來使得流接受 JavaScript 對象。

下面是一個簡單的例子。例子中是一個變形金剛流,它将接收到的以逗号分隔的字元串給轉換成一個對象。如:

"a,b,c,d"

轉換為

{a:b,c:d}

const{ Transform} = require('stream');

const commaSplitter = newTransform({

 readableObjectMode: true,

 transform(chunk, encoding, callback) {

   this.push(chunk.toString().trim().split(','));

   callback();

 }

});

const arrayToObject = newTransform({

 readableObjectMode: true,

 writableObjectMode: true,

 transform(chunk, encoding, callback) {

   const obj = {};

   for(let i=0; i < chunk.length; i+=2) {

     obj[chunk[i]] = chunk[i+1];

   }

   this.push(obj);

   callback();

 }

});

const objectToString = newTransform({

 writableObjectMode: true,

 transform(chunk, encoding, callback) {

   this.push(JSON.stringify(chunk) + '\n');

   callback();

 }

});

process.stdin

 .pipe(commaSplitter)

 .pipe(arrayToObject)

 .pipe(objectToString)

 .pipe(process.stdout)

我們将輸入資料(例如

"a,b,c,d"

)給傳入

commaSplitter

,它會将字元串分割成數組(

["a","b","c","d"]

)來作為自身資料。為其加上

readableObjectMode

标記來使得其可以接受對象作為

push

的參數。

接下去,我們将這個數組給

pipe

arrayToObject

流中。這裡我們則需要

writableObjectMode

标記,使其能接受對象作為輸入資料。此外,我們還需要在裡面将數組給轉換為字元串并

push

,是以還需要為其加上

readableObjectMode

标記。最後一個

objectToString

流接受一個對象,但輸出的是字元串,是以我們隻需要

writableObjectMode

标記就好了,畢竟可讀流部分是一個普通字元串(一個被

stringify

後的對象)。

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

Node.js 的内置變形金剛流

Node.js 内置了一些很有用的變形金剛流。點一下名,如 zlib 和 crypto。

下面是一個使用

zlib.createGzip

fs

的可讀/可寫流結合起來寫的一個檔案壓縮腳本:

const fs = require('fs');

const zlib = require('zlib');

const file = process.argv[2];

fs.createReadStream(file)

 .pipe(zlib.createGzip())

 .pipe(fs.createWriteStream(file + '.gz'));

我們可以使用這個腳本來壓縮傳入參數中所指明的檔案。我們将一個檔案的可讀流給 pipe 到 zlib 的内置變形金剛流中,然後将其傳入可寫流中去,這樣就出來了一個新的壓縮後檔案。Easy。

特别膩害的是,我們可以在必要時候為其加上一些事件。例如我想使用者看到壓縮的進度條,然後在壓縮完成的時候看到 “Done” 字樣。由于

pipe

方法傳回的是目标流,是以我們就可以鍊式調用,并在期間加上監聽:

const fs = require('fs');

const zlib = require('zlib');

const file = process.argv[2];

fs.createReadStream(file)

 .pipe(zlib.createGzip())

 .on('data', () => process.stdout.write('.'))

 .pipe(fs.createWriteStream(file + '.zz'))

 .on('finish', () => console.log('Done'));

雖然跟

pipe

函數一起搞事情的話,我們可以非常友善地消費流,但是我們想要一些額外功能的時候,就需要用到事件了。

pipe

還有很酷的一點就是,我們可以通過這種形式來逐條組合我們的代碼,使其友善閱讀。例如,我們用另外一種姿勢去實作上述代碼,也就是建立一個變形金剛流去彙報進度,然後把

.on()

替換成

.pipe()

const fs = require('fs');

const zlib = require('zlib');

const file = process.argv[2];

const{ Transform} = require('stream');

const reportProgress = newTransform({

 transform(chunk, encoding, callback) {

   process.stdout.write('.');

   callback(null, chunk);

 }

});

fs.createReadStream(file)

 .pipe(zlib.createGzip())

 .pipe(reportProgress)

 .pipe(fs.createWriteStream(file + '.zz'))

 .on('finish', () => console.log('Done'));

這個

reportProgress

流是一個簡單的即傳即走流,但它也能正常地将進度輸出到标準輸出。注意看我給

callback()

的第二個參數傳入了輸入資料

chunk

。這等同于先

push

callback

組合流的用法是無止境的。例如我想要在壓縮檔案前先加密檔案,以及在解壓檔案後再解密檔案,我們所需要做的就是将其再 pipe 到另一個變形金剛流中。我們可以這樣用 Node.js 的

crypto

子產品:

const crypto = require('crypto');

// ...

fs.createReadStream(file)

 .pipe(zlib.createGzip())

 .pipe(crypto.createCipher('aes192', 'a_secret'))

 .pipe(reportProgress)

 .pipe(fs.createWriteStream(file + '.zz'))

 .on('finish', () => console.log('Done'));

上面的腳本在壓縮後再對檔案加了個密,有

secret

的人才能使用輸出檔案。我們不能使用普通的 unzip 方式去解壓該檔案——畢竟被加密了。

我們需要使用一個相反的順序和流對上述腳本生成的檔案進行解壓,這也很簡單:

fs.createReadStream(file)

 .pipe(crypto.createDecipher('aes192', 'a_secret'))

 .pipe(zlib.createGunzip())

 .pipe(reportProgress)

 .pipe(fs.createWriteStream(file.slice(0, -3)))

 .on('finish', () => console.log('Done'));

假設我們傳入的就是一個對的壓縮檔案,那麼上述代碼就會為其建立一個可讀流,然後傳給

crypto.createDecipher

建立的流,将其輸出傳入

zlib.createGunzip()

建立的流中,最後把内容寫回一個另一個檔案,其檔案名是将傳入的檔案名

*.zz

字尾去掉。

這就是我在本文内容中所要講述的所有内容了。感謝閱讀!下次見!

▼往期精彩回顧▼入門 Node.js Net 子產品建構 TCP 網絡服務Node.js DNS (域名伺服器) 子產品Node.js 中實踐 Redis Lua 腳本Node.js 核心子產品  Events Node.js 中實踐基于 Redis 的分布式鎖實作分享 10 道 Nodejs EventLoop 和事件相關面試題Docker 容器環境下 Node.js 應用程式的優雅退出Node.js 服務 Docker 容器化應用實踐 Node.js 是什麼?我為什麼選擇它?分享 10 道 Nodejs 程序相關面試題Node.js進階之程序與線程Node.js 中的緩沖區(Buffer)究竟是什麼?Node.js 記憶體管理和 V8 垃圾回收機制淺談 Node.js 子產品機制及常見面試問題解答

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...
node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...

在看點這裡

node createwritestream 寫入不覆寫原内容_[譯] 你所需要知道的關于 Node.js Streams 的一切...