系统性Node.js——手写文件流( 三 )


【系统性Node.js——手写文件流】这里大家可能会对 fs.open , ````fs.read , EventEmitter``` 不太熟悉 , 可以从前文回顾里看下我之前的文章 , 这些都是有讲到的 。
文件可写流顾名思义了 , 将内容一点一点写入到文件里去 。
使用方式// 使用方式 1:const ws = fs.createWriteStream('./w-test.js')// 使用方式 2:const ws = new WriteStream('./w-test.js', {flags: 'w',encoding: 'utf8',autoClose: true,highWaterMark: 2})// 写入文件const flag = ws.write('2')ws.on('drain', () => console.log('drain'))

  • ws.write() 写入文件 。 这里有一个返回值 , 代表是否已经达到最大缓存 。 当我们同步连续调用多次 write()时 , 并不是每次调用都立即写入文件 , 而是同一时间只能执行一次写入操作 , 所以剩下的会被写入到缓存中 , 等上一次写入完毕后再从缓存中依次取出执行 。 所以 , 这时就会有一个最大的缓存大小 , 默认为 64kb 。 而这里的返回值则代表 , 是否还可以继续写入 , 也就是:是否达到了最大缓存 。 true 代表可以继续写入 。
  • ws.on('drain') , 如果调用ws.write()返回 false , 则当可以继续写入数据到流时会触发 'drain' 事件 。
手写文件可写流接下来我们手写 WriteStream
初始化还是老套路 , 先定义 WriteStream 类 , 并继承 EventEmitter
然后 , 初始化参数 。 注意看代码注释
const { EventEmitter } = require('events')const fs = require('fs')class WriteStream extends EventEmitter {constructor(path, options = {}) {super()this.path = paththis.flags = options.flags ?? 'w'this.encoding = options.encoding ?? 'utf8'this.autoClose = options.autoClose ?? truethis.highWaterMark = options.highWaterMark ?? 16 * 1024this.offset = 0 // 文件读取偏移量this.cache = [] // 缓存的要被写入的内容// 将要被写入的总长度 , 包括缓存中的内容长度this.writtenLen = 0// 是否正在执行写入操作 ,// 如果正在写入 , 那以后的操作需放入 this.catchethis.writing = false// 是否应该触发 drain 事件this.needDrain = false// 打开文件this.open()}}open()open 方法没啥好说的了 , 跟 ReadStream 一样的代码 。
open() {fs.open(this.path, this.flags, (err, fd) => {if (err) {this.emit('error', err)return}this.fd = fdthis.emit('open')})}write()最关键的方法 , 执行写入操作 , 先看下代码 。
每行代码都有注释 , 注意看~~~
write(chunk, encoding, cb = () => {}) {// 初始化被写入的内容// 如果时字符串 , 则转为 bufferchunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding)// 计算要被写入的长度this.writtenLen += chunk.length// 判断是否已经超过 highWaterMarkconst hasLimit = this.writtenLen >= this.highWaterMark// 是否需要触发 drain// 如果超过 highWaterMark , 则代表需要触发this.needDrain = hasLimit// 如果没有正在写入的内容 , 则调用 _write 直接开始写入// 否则放入 catch 中// 写入完成后 , 调用 clearBuffer , 从缓存中拿取最近一次内容开始写入if (!this.writing) {this.writing = truethis._write(chunk, () => {cb()this.clearBuffer()})} else {this.cache.push({chunk: chunk,cb})}return !hasLimit}// 写入操作_write(chunk, cb) {if (typeof this.fd !== 'number') {this.once('open', () => this._write(chunk, cb))return}fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, bytesWritten) => {if (err) {this.emit('error', err)return}// 计算偏移量this.offset += bytesWritten// 写入完毕 , 则减去当前写入的长度this.writtenLen -= bytesWrittencb()})}


推荐阅读