1.可寫流createWriteStream的使用
1.1 建立可寫流
const ws = fs.createWriteStream(path.resolve(__dirname, 'test.txt'), {
flags: 'w', // 寫流不能用r,會報錯.可以用'a'表示追加
encoding: 'utf8', // 不寫預設是utf8
autoClose: true, // 寫完是否自動關閉
// start: 0, //從第幾個位元組開始寫
highWaterMark: 3 // 這是安全線,預設寫的水位線是16k,即16 * 1024。表示總共寫入的大小在這個範圍内表示安全的,因為這代表着連結清單緩存區的大小,如果滿了,那麼就需要rs.pause()暫停讀取,等緩存區的内容開始寫入,留出一部分空間後,再去寫入。如果超過這個值,雖然不會阻止你寫入,但是會告訴你已經超了這條安全線了。特别注意:超過這個水位線也沒事,隻是超過會ws.write()會傳回給你false,不超過傳回的是true
})
- 特别注意這個highWaterMark,所代表的意思就是"水位線",預設大小是16k,那"水位線"是具體什麼意思?
- 按照它源碼中的寫法,這個highWaterMark的值就代表 目前寫入的位元組 + 緩存中的位元組數 的和,如果超過這個值,雖然不會阻止你繼續往緩存中去寫入,但是
的傳回值會變為false,不超過的話,傳回的是true。這樣做的目的,是為了能夠做到讀一部分,寫一部分,不會導緻記憶體爆滿。可讀流的write()方法
- 按照它源碼中的寫法,這個highWaterMark的值就代表 目前寫入的位元組 + 緩存中的位元組數 的和,如果超過這個值,雖然不會阻止你繼續往緩存中去寫入,但是
- 傳回值ws代表檔案可寫流對象
1.2 可寫流監聽的一些列常用事件
- open事件
- drain事件
1.2.1 open事件
ws.on('open', (fd) => {
console.log(fd); // 檔案描述符
})
1.2.2 drain事件
- 觸發
事件需要同時滿足兩個條件drain
- 當正在寫入的資料的位元組數 + 緩存中的位元組數 之和,超過highWaterMark
- 将這些資料(正在寫入和緩存中的)寫入完畢
ws.on('drain', () => {
console.log('ok')
})
1.3 可寫流的常用方法
- write()
- close()
- end()
1.3.1 可寫流write()方法
- 隻能寫入 string 或 buffer,源碼中都會統一轉成buffer
- 傳回值代表本次寫入時,正在寫入的資料位元組數 + 緩存位元組數 之和,是否超過highWaterMark,超過傳回false,不超過傳回true
let flag = ws.write('1',() => { // 傳回值flag表示總共寫入的大小是否超出了highWaterMark,超出了就是false,主要用來限制是否要繼續讀取,值為false表示目前連結清單緩存區滿了,需要等一等再寫,這時候可以調用rs.pause()暫停讀取,那麼也就不會寫入緩存區了,等緩存區的資料真正開始寫入檔案了,那麼這時候緩存區就有空間了,那麼flag就變為true了,表示可以繼續寫入了
console.log(1)
}); // 隻能寫入 string 或者 buffer類型
1.3.2 可寫流的close()方法
- 代表關閉可寫流
1.3.3 可寫流的end()方法
- 該方法參數可傳可不傳:
- 傳了參數,等價于 先調用ws.write(内容)把内容寫入,然後調用ws.close()
- 不傳參數,等價于直接調用ws.close()
- 注意:
- 如果連續多次使用ws.end(有參數)方法,如果連續end()方法都有參數,那麼會報錯,因為第一次end(有參數)就代表寫入内容,并關閉檔案,第二次再end(有參數),内容就無法寫入了,因為已經關閉檔案了,是以會報錯。
- 但是如果ws.end(有參數)後面調用沒有參數的ws.end(),是沒關系的,因為第一次雖然關閉了,但是第二次ws.end()沒有參數,隻是又做了一次關閉操作,重複關閉不會有問題。
ws.end('ok1') // 等價于 先調用ws.write('ok')把ok寫入,然後調用ws.close()
// ws.end('ok2') // 這樣寫會報錯,因為close後不能再write
ws.end() // 如果不傳參數,是可以的,相當于又調用了一次ws.close()。沒有調用write
1.4 可寫流的特點
- 異步并發串行寫入
- 比如:同時調用ws.write(),分别寫入1、2、3、4,那麼調用的時候是并發的,也就是說,會去寫1,但是不能同時寫2、3、4, 2、3、4要放在緩存區中,等1寫完了,再從緩存區拿出2去寫,寫完拿出3,3寫完拿出4去寫,一直到緩存區寫完。第一次寫入是真的向檔案中寫,後續的操作都緩存到連結清單中了。
- 緩存區是用連結清單來實作的
- 因為如果資料比較多的話,假如用的是數組,取出第一個,後面每一個都會往上移一位,消耗性能比較大。
- 是以,可以用連結清單來實作棧或者隊列,取資料的頭部的時候,性能會比數組高一些。因為連結清單的指針始終指向資料的頭部,當頭部取出的時候,隻需要移動指針,移向下一位就可以了。
1.5 可寫流 與 檔案可寫流(原理)
- 檔案可寫流 是繼承 Writable類的,Writable類繼承了events子產品來實作訂閱釋出。像on(‘opne’)、on(‘drain’)事件都是通過writable的this.emit()來實作的。
- 可寫流本身沒有fs的操作,檔案可寫流在繼承了可寫流Writable類後,又實作了fs相關操作。
- 比如當我們建立一個檔案可寫流的時候,就會調用fs.open()打開檔案,拿到檔案描述符fd。
- 然後如果使用者去調用write方法,那麼就會其實并不是調用檔案可寫流這個類的write方法,其實調用的是可讀流Writable類下的write方法,Writable類下的write方法再去調用檔案可讀流類下的_write()方法,源碼中是doWrite方法。這個_write()方法内部才是真正的調用fs.write()去一部分一部分寫檔案。
- 當然,在這個寫檔案的過程中,涉及到一個緩存區,跟highWaterMark有關,什麼意思?具體就是:當我多次調用write方法的時候,雖然write是異步的,但是其實我寫入的順序是同步的,這是通過緩存區做到的。也就是說,當我多次調用write()方法的時候,會做判斷,如果是第一次調用,那麼就直接寫入檔案,後面幾次調用的write()方法的内容都放到緩存區中,等第一次寫完後,再去緩存區中讀取并寫入,這樣一次次将緩存區寫完。當多次調用write()方法的資料的總長度,等于或超過highWaterMark值,并且将緩存區資料全局寫入檔案時,會觸發drain事件,代表抽幹了,可以繼續寫入了。檔案可寫流就是這樣一批批寫入資料的。
- 手寫檔案可寫流這個類
const fs = require('fs');
const EventEmitter = require('events');
const Queue = require('./7.連結清單封裝成隊列')
class Writable extends EventEmitter {
constructor(options){
console.log(options, 111111)
super();
this.len = 0; // 每次寫入的總長度(正在寫入 + 緩存的長度),緩存空了後就為0了
}
write(chunk, encoding = this.encoding, cb = () => {}){ // 這個write是使用者調用的
// 這裡需要判斷是真的寫入還是放到緩存中
// 使用者調用write時,寫入的資料可能是string或者Buffer,是以統一轉成buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.len += chunk.length;
let ret = this.len < this.highWaterMark;
if (!ret) { // 如果長度大于等于水位線,就改變傳回值類型——ws.write()方法的傳回值類型
this.needDrain = true;
}
if (this.writing) { // 如果目前正在寫,那麼就放到緩存中去
this.cache.offer({
chunk,
encoding,
cb
})
} else { // 如果沒有正在寫,就代表可以去寫
this.writing = true; // 表示現在開始,正在寫了
this._write(chunk, encoding, () => { // 這裡的回調函數 不是 使用者ws.write()的回調函數
cb(); // 使用者的回調要執行
this.clearBuffer(); // 清空緩存
})
}
return ret;
}
clearBuffer(){ // 多個異步并發 可以靠隊列來實作,依次清空隊列
let data = this.cache.poll().element; // 緩存中删除第一個,然後拿到第一個資料
if (data) {
let { chunk, encoding, cb } = data;
this._write(chunk, encoding, () => {
cb(); // 使用者的回調要執行
this.clearBuffer(); // 清空緩存
});
} else {
this.writing = false; // 緩存中的内容也寫入了 清空緩存
if (this.needDrain) { // 當不寫了,要判斷下是否需要出發drain,this.needDrain代表着資料長度有沒有超過highWaterMark
this.needDrain = false; // 如果需要觸發drain,那麼先置為false,再觸發
this.emit('drain'); // 觸發drain
}
}
}
}
class WriteStream extends Writable {
constructor(path, options){
super(options);
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || 'utf8';
this.highWaterMark = options.highWaterMark || 16 * 1024;
if (typeof this.autoClose === 'undefined') {
this.autoClose = true;
} else {
this.autoClose = options.autoClose;
}
this.open();
// 要判斷是第一次寫入,還是第二次寫入
this.writing = false; // 用來描述目前是否有正在寫入的操作
this.needDrain = false; // 是否觸發drain事件,預設false不觸發。觸發條件:寫入的長度大于等于highWaterMark,并且寫完,也就是this.writing為false
this.offset = 0; // 每次寫時入的偏移量
this.cache = new Queue(); // 緩存區,先用數組,後面再改成連結清單
}
open(){
fs.open(this.path, this.flags, (err, fd) => {
if (err) this.emit('error', err);
this.fd = fd;
this.emit('open', fd);
})
}
_write(chunk, encoding, callback){ // 這個相當于fs.write 等價于 源碼中的doWrite
// debugger
if (typeof this.fd !== 'number') {
return this.once('open', () => {
this._write(chunk, encoding, callback)
})
}
fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => {
this.len -= written; // 寫完len要減少
this.offset += written; // 寫完偏移量要增加
callback();
})
}
}
module.exports = WriteStream;
上面所涉及的緩存區是連結清單的形式,連結清單的緩存區是這樣實作可看我寫的關于連結清單的文章(也就是下一篇)
- 使用自己寫的檔案可寫流
/*
fs.createWriteStream方法的源碼邏輯
1. 格式化傳入的資料,預設需要打開檔案
2. 使用者會調用write方法,這個方法資料Writeable類實作的write方法,内部會調用_write方法,該方法内部就是fs.write方法
3. 區分是第一次寫入還是後續寫入,第一次寫入是真的往檔案中寫,後續寫入是往連結清單緩存區中寫
*/
const fs = require('fs')
const path = require('path')
const WriteStream = require('./WriteStream.js')
// const ws = fs.createWriteStream(path.resolve(__dirname, 'text.txt'), {
const ws = new WriteStream(path.resolve(__dirname, 'text.txt'), {
flags: 'w', // 寫流不能用r,會報錯.可以用'a'表示追加
encoding: 'utf8', // 不寫預設是utf8
autoClose: true, // 寫完是否自動關閉
// start: 0, //從第幾個位元組開始寫
highWaterMark: 1
})
ws.on('open', function(fd){
console.log(fd);
})
// ws.write('y');
// ws.write('h');
let flag = ws.write('y');
console.log(flag);
flag = ws.write('h');
console.log(flag);
ws.on('drain', () => {
console.log('抽幹了')
})
1.6 關于面試題
1.6.1 并發多個ws.write()方法,為什麼異步并發會串行寫入?( 面試題
)
面試題
- 因為檔案可寫流,寫入的時候會先寫到一個連結清單實作的緩存區中,等一個寫完後,再從緩存區的頭部拿出一個寫入。
- 舉個例子:
- 同時調用ws.write(),分别寫入1、2、3、4,那麼調用的時候是并發的,也就是說,會去寫1,但是不能同時寫2、3、4,
- 2、3、4要放在緩存區中,等1寫完了,再從緩存區拿出2去寫,寫完拿出3,3寫完拿出4去寫,一直到緩存區寫完。
- 第一次寫入是真的向檔案中寫,後續的操作都緩存到連結清單中了
1.6.2 繼續問:為什麼緩存區需要連結清單實作,而不用數組來實作?( 面試題
)
面試題
- 因為如果資料比較多的話,假如用的是數組,取出第一個,後面每一個都會往上移一位,消耗性能比較大。
- 是以,可以用連結清單來實作棧或者隊列,取資料的頭部的時候,性能會比數組高一些。因為連結清單的指針始終指向資料的頭部,當頭部取出的時候,隻需要移動指針,移向下一位就可以了。
- 當然上面說的是單向清單,連結清單的類型有很多種:單向、雙向、環形、循環清單等等。(這個不用答,多答多問多錯)
1.6.3 繼續問:ws.write()這個write方法傳回值是什麼?有什麼含義?( 面試題
)
面試題
- 首先,傳回值是一個布爾值
- 表示總共寫入的大小是否超出了highWaterMark,主要用來表示是否要繼續讀取
- 具體的意思就是:
- 當我們寫入的時候,也就是調用write()方法的時候,如果并發多次調用,第一次調用會寫入檔案,後面的會先放到一個連結清單結構的緩存區中,等檔案寫入完一個,再從緩存區中讀取一個開始寫入,一點點把緩存區讀完。
- 那麼,當傳回值為false表示目前連結清單緩存區滿了,需要等一等再寫,這時候可以調用rs.pause()暫停讀取,那麼也就不會寫入緩存區了,等緩存區的資料真正開始寫入檔案了,那麼這時候緩存區就有空間了,那麼flag就變為true了,表示可以繼續寫入了
1.6.4 多個異步并發怎麼變成串行?( 面試題
)
面試題
- 将并發内容放入隊列,一個個執行
- 比如檔案可寫流,多次同時調用write()方法寫入内容,就是第一次真正将内容寫入檔案,後面幾次都放到緩存隊列中,然後等第一次寫完,從緩存中取出一個寫入,這樣一個個取出來寫入,直至寫完。這個過程多次同時調用write方法就是異步并發的,但是放入緩存後,一個個寫入就成了串行的了。
- 當然,實際源碼中,緩存區本質是一個連結清單。