天天看點

10.檔案可寫流fs.createWriteStream

1.可寫流createWriteStream的使用

1.1 建立可寫流

const ws = fs.createWriteStream(path.resolve(__dirname, 'test.txt'), {
  flags: 'w', // 寫流不能用r,會報錯.可以用'a'表示追加
  encoding: 'utf8', // 不寫預設是utf8
  autoClose: true, // 寫完是否自動關閉
  // start: 0, //從第幾個位元組開始寫
  highWaterMark: 3 // 這是安全線,預設寫的水位線是16k,即16 * 1024。表示總共寫入的大小在這個範圍内表示安全的,因為這代表着連結清單緩存區的大小,如果滿了,那麼就需要rs.pause()暫停讀取,等緩存區的内容開始寫入,留出一部分空間後,再去寫入。如果超過這個值,雖然不會阻止你寫入,但是會告訴你已經超了這條安全線了。特别注意:超過這個水位線也沒事,隻是超過會ws.write()會傳回給你false,不超過傳回的是true
})
           
  • 特别注意這個highWaterMark,所代表的意思就是"水位線",預設大小是16k,那"水位線"是具體什麼意思?
    • 按照它源碼中的寫法,這個highWaterMark的值就代表 目前寫入的位元組 + 緩存中的位元組數 的和,如果超過這個值,雖然不會阻止你繼續往緩存中去寫入,但是

      可讀流的write()方法

      的傳回值會變為false,不超過的話,傳回的是true。這樣做的目的,是為了能夠做到讀一部分,寫一部分,不會導緻記憶體爆滿。
  • 傳回值ws代表檔案可寫流對象

1.2 可寫流監聽的一些列常用事件

  • open事件
  • drain事件
1.2.1 open事件
ws.on('open', (fd) => {
  console.log(fd); // 檔案描述符
})
           
1.2.2 drain事件
  • 觸發

    drain

    事件需要同時滿足兩個條件
    • 當正在寫入的資料的位元組數 + 緩存中的位元組數 之和,超過highWaterMark
    • 将這些資料(正在寫入和緩存中的)寫入完畢
ws.on('drain', () => {
  console.log('ok')
})
           

1.3 可寫流的常用方法

  • write()
  • close()
  • end()
1.3.1 可寫流write()方法
  • 隻能寫入 string 或 buffer,源碼中都會統一轉成buffer
  • 傳回值代表本次寫入時,正在寫入的資料位元組數 + 緩存位元組數 之和,是否超過highWaterMark,超過傳回false,不超過傳回true
let flag = ws.write('1',() => { // 傳回值flag表示總共寫入的大小是否超出了highWaterMark,超出了就是false,主要用來限制是否要繼續讀取,值為false表示目前連結清單緩存區滿了,需要等一等再寫,這時候可以調用rs.pause()暫停讀取,那麼也就不會寫入緩存區了,等緩存區的資料真正開始寫入檔案了,那麼這時候緩存區就有空間了,那麼flag就變為true了,表示可以繼續寫入了
  console.log(1)
}); // 隻能寫入 string 或者 buffer類型
           
1.3.2 可寫流的close()方法
  • 代表關閉可寫流
1.3.3 可寫流的end()方法
  • 該方法參數可傳可不傳:
    • 傳了參數,等價于 先調用ws.write(内容)把内容寫入,然後調用ws.close()
    • 不傳參數,等價于直接調用ws.close()
  • 注意:
    • 如果連續多次使用ws.end(有參數)方法,如果連續end()方法都有參數,那麼會報錯,因為第一次end(有參數)就代表寫入内容,并關閉檔案,第二次再end(有參數),内容就無法寫入了,因為已經關閉檔案了,是以會報錯。
    • 但是如果ws.end(有參數)後面調用沒有參數的ws.end(),是沒關系的,因為第一次雖然關閉了,但是第二次ws.end()沒有參數,隻是又做了一次關閉操作,重複關閉不會有問題。
ws.end('ok1') // 等價于 先調用ws.write('ok')把ok寫入,然後調用ws.close()
// ws.end('ok2') // 這樣寫會報錯,因為close後不能再write
ws.end() // 如果不傳參數,是可以的,相當于又調用了一次ws.close()。沒有調用write
           

1.4 可寫流的特點

  • 異步并發串行寫入
    • 比如:同時調用ws.write(),分别寫入1、2、3、4,那麼調用的時候是并發的,也就是說,會去寫1,但是不能同時寫2、3、4, 2、3、4要放在緩存區中,等1寫完了,再從緩存區拿出2去寫,寫完拿出3,3寫完拿出4去寫,一直到緩存區寫完。第一次寫入是真的向檔案中寫,後續的操作都緩存到連結清單中了。
  • 緩存區是用連結清單來實作的
    • 因為如果資料比較多的話,假如用的是數組,取出第一個,後面每一個都會往上移一位,消耗性能比較大。
    • 是以,可以用連結清單來實作棧或者隊列,取資料的頭部的時候,性能會比數組高一些。因為連結清單的指針始終指向資料的頭部,當頭部取出的時候,隻需要移動指針,移向下一位就可以了。

1.5 可寫流 與 檔案可寫流(原理)

  • 檔案可寫流 是繼承 Writable類的,Writable類繼承了events子產品來實作訂閱釋出。像on(‘opne’)、on(‘drain’)事件都是通過writable的this.emit()來實作的。
  • 可寫流本身沒有fs的操作,檔案可寫流在繼承了可寫流Writable類後,又實作了fs相關操作。
    • 比如當我們建立一個檔案可寫流的時候,就會調用fs.open()打開檔案,拿到檔案描述符fd。
    • 然後如果使用者去調用write方法,那麼就會其實并不是調用檔案可寫流這個類的write方法,其實調用的是可讀流Writable類下的write方法,Writable類下的write方法再去調用檔案可讀流類下的_write()方法,源碼中是doWrite方法。這個_write()方法内部才是真正的調用fs.write()去一部分一部分寫檔案。
      • 當然,在這個寫檔案的過程中,涉及到一個緩存區,跟highWaterMark有關,什麼意思?具體就是:當我多次調用write方法的時候,雖然write是異步的,但是其實我寫入的順序是同步的,這是通過緩存區做到的。也就是說,當我多次調用write()方法的時候,會做判斷,如果是第一次調用,那麼就直接寫入檔案,後面幾次調用的write()方法的内容都放到緩存區中,等第一次寫完後,再去緩存區中讀取并寫入,這樣一次次将緩存區寫完。當多次調用write()方法的資料的總長度,等于或超過highWaterMark值,并且将緩存區資料全局寫入檔案時,會觸發drain事件,代表抽幹了,可以繼續寫入了。檔案可寫流就是這樣一批批寫入資料的。
  • 手寫檔案可寫流這個類
const fs = require('fs');
const EventEmitter = require('events');
const Queue = require('./7.連結清單封裝成隊列')
class Writable extends EventEmitter {
  constructor(options){
    console.log(options, 111111)
    super();
    this.len = 0; // 每次寫入的總長度(正在寫入 + 緩存的長度),緩存空了後就為0了
  }
  write(chunk, encoding = this.encoding, cb = () => {}){ // 這個write是使用者調用的
    // 這裡需要判斷是真的寫入還是放到緩存中
    // 使用者調用write時,寫入的資料可能是string或者Buffer,是以統一轉成buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    this.len += chunk.length;
    let ret = this.len < this.highWaterMark;
    if (!ret) { // 如果長度大于等于水位線,就改變傳回值類型——ws.write()方法的傳回值類型
      this.needDrain = true;
    }
    if (this.writing) { // 如果目前正在寫,那麼就放到緩存中去
      this.cache.offer({
        chunk,
        encoding,
        cb
      })
    } else { // 如果沒有正在寫,就代表可以去寫
      this.writing = true; // 表示現在開始,正在寫了
      this._write(chunk, encoding, () => { // 這裡的回調函數 不是 使用者ws.write()的回調函數
        cb(); // 使用者的回調要執行
        this.clearBuffer(); // 清空緩存
      })
    }
    return ret;
  }
  clearBuffer(){ // 多個異步并發 可以靠隊列來實作,依次清空隊列
    let data = this.cache.poll().element; // 緩存中删除第一個,然後拿到第一個資料
    if (data) {
      let { chunk, encoding, cb } = data;
      this._write(chunk, encoding, () => {
        cb(); // 使用者的回調要執行
        this.clearBuffer(); // 清空緩存
      });
    } else {
      this.writing = false; // 緩存中的内容也寫入了 清空緩存
      if (this.needDrain) { // 當不寫了,要判斷下是否需要出發drain,this.needDrain代表着資料長度有沒有超過highWaterMark
        this.needDrain = false; // 如果需要觸發drain,那麼先置為false,再觸發
        this.emit('drain'); // 觸發drain
      }
    }
  }
}
class WriteStream extends Writable {
  constructor(path, options){
    super(options);
    this.path = path;
    this.flags = options.flags || 'r';
    this.encoding = options.encoding || 'utf8';
    this.highWaterMark = options.highWaterMark || 16 * 1024;
    if (typeof this.autoClose === 'undefined') {
      this.autoClose = true;
    } else {
      this.autoClose = options.autoClose;
    }

    this.open();

    // 要判斷是第一次寫入,還是第二次寫入
    this.writing = false; // 用來描述目前是否有正在寫入的操作
    this.needDrain = false; // 是否觸發drain事件,預設false不觸發。觸發條件:寫入的長度大于等于highWaterMark,并且寫完,也就是this.writing為false
    
    this.offset = 0; // 每次寫時入的偏移量
    this.cache = new Queue(); // 緩存區,先用數組,後面再改成連結清單
  }
  open(){
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) this.emit('error', err);
      this.fd = fd;
      this.emit('open', fd);
    })
  }
  _write(chunk, encoding, callback){ // 這個相當于fs.write 等價于 源碼中的doWrite
    // debugger
    if (typeof this.fd !== 'number') {
      return this.once('open', () => {
        this._write(chunk, encoding, callback)
      })
    }
    fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => {
      this.len -= written; // 寫完len要減少
      this.offset += written; // 寫完偏移量要增加
      callback();
    })
  }
}

module.exports = WriteStream;
           
上面所涉及的緩存區是連結清單的形式,連結清單的緩存區是這樣實作可看我寫的關于連結清單的文章(也就是下一篇)
  • 使用自己寫的檔案可寫流
/*
  fs.createWriteStream方法的源碼邏輯
  1. 格式化傳入的資料,預設需要打開檔案
  2. 使用者會調用write方法,這個方法資料Writeable類實作的write方法,内部會調用_write方法,該方法内部就是fs.write方法
  3. 區分是第一次寫入還是後續寫入,第一次寫入是真的往檔案中寫,後續寫入是往連結清單緩存區中寫
*/

const fs = require('fs')
const path = require('path')
const WriteStream = require('./WriteStream.js')

// const ws = fs.createWriteStream(path.resolve(__dirname, 'text.txt'), {
const ws = new WriteStream(path.resolve(__dirname, 'text.txt'), {
  flags: 'w', // 寫流不能用r,會報錯.可以用'a'表示追加
  encoding: 'utf8', // 不寫預設是utf8
  autoClose: true, // 寫完是否自動關閉
  // start: 0, //從第幾個位元組開始寫
  highWaterMark: 1 
})

ws.on('open', function(fd){
  console.log(fd);
})
// ws.write('y');
// ws.write('h');
let flag = ws.write('y');
console.log(flag);
flag = ws.write('h');
console.log(flag);

ws.on('drain', () => {
  console.log('抽幹了')
})
           

1.6 關于面試題

1.6.1 并發多個ws.write()方法,為什麼異步并發會串行寫入?(

面試題

  • 因為檔案可寫流,寫入的時候會先寫到一個連結清單實作的緩存區中,等一個寫完後,再從緩存區的頭部拿出一個寫入。
  • 舉個例子:
    • 同時調用ws.write(),分别寫入1、2、3、4,那麼調用的時候是并發的,也就是說,會去寫1,但是不能同時寫2、3、4,
    • 2、3、4要放在緩存區中,等1寫完了,再從緩存區拿出2去寫,寫完拿出3,3寫完拿出4去寫,一直到緩存區寫完。
    • 第一次寫入是真的向檔案中寫,後續的操作都緩存到連結清單中了
1.6.2 繼續問:為什麼緩存區需要連結清單實作,而不用數組來實作?(

面試題

  • 因為如果資料比較多的話,假如用的是數組,取出第一個,後面每一個都會往上移一位,消耗性能比較大。
    • 是以,可以用連結清單來實作棧或者隊列,取資料的頭部的時候,性能會比數組高一些。因為連結清單的指針始終指向資料的頭部,當頭部取出的時候,隻需要移動指針,移向下一位就可以了。
    • 當然上面說的是單向清單,連結清單的類型有很多種:單向、雙向、環形、循環清單等等。(這個不用答,多答多問多錯)
1.6.3 繼續問:ws.write()這個write方法傳回值是什麼?有什麼含義?(

面試題

  • 首先,傳回值是一個布爾值
  • 表示總共寫入的大小是否超出了highWaterMark,主要用來表示是否要繼續讀取
  • 具體的意思就是:
    • 當我們寫入的時候,也就是調用write()方法的時候,如果并發多次調用,第一次調用會寫入檔案,後面的會先放到一個連結清單結構的緩存區中,等檔案寫入完一個,再從緩存區中讀取一個開始寫入,一點點把緩存區讀完。
    • 那麼,當傳回值為false表示目前連結清單緩存區滿了,需要等一等再寫,這時候可以調用rs.pause()暫停讀取,那麼也就不會寫入緩存區了,等緩存區的資料真正開始寫入檔案了,那麼這時候緩存區就有空間了,那麼flag就變為true了,表示可以繼續寫入了
1.6.4 多個異步并發怎麼變成串行?(

面試題

  • 将并發内容放入隊列,一個個執行
  • 比如檔案可寫流,多次同時調用write()方法寫入内容,就是第一次真正将内容寫入檔案,後面幾次都放到緩存隊列中,然後等第一次寫完,從緩存中取出一個寫入,這樣一個個取出來寫入,直至寫完。這個過程多次同時調用write方法就是異步并發的,但是放入緩存後,一個個寫入就成了串行的了。
  • 當然,實際源碼中,緩存區本質是一個連結清單。

繼續閱讀