You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							423 lines
						
					
					
						
							9.8 KiB
						
					
					
				
			
		
		
	
	
							423 lines
						
					
					
						
							9.8 KiB
						
					
					
				'use strict'
 | 
						|
const MiniPass = require('minipass')
 | 
						|
const EE = require('events').EventEmitter
 | 
						|
const fs = require('fs')
 | 
						|
 | 
						|
let writev = fs.writev
 | 
						|
/* istanbul ignore next */
 | 
						|
if (!writev) {
 | 
						|
  // This entire block can be removed if support for earlier than Node.js
 | 
						|
  // 12.9.0 is not needed.
 | 
						|
  const binding = process.binding('fs')
 | 
						|
  const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
 | 
						|
 | 
						|
  writev = (fd, iovec, pos, cb) => {
 | 
						|
    const done = (er, bw) => cb(er, bw, iovec)
 | 
						|
    const req = new FSReqWrap()
 | 
						|
    req.oncomplete = done
 | 
						|
    binding.writeBuffers(fd, iovec, pos, req)
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
const _autoClose = Symbol('_autoClose')
 | 
						|
const _close = Symbol('_close')
 | 
						|
const _ended = Symbol('_ended')
 | 
						|
const _fd = Symbol('_fd')
 | 
						|
const _finished = Symbol('_finished')
 | 
						|
const _flags = Symbol('_flags')
 | 
						|
const _flush = Symbol('_flush')
 | 
						|
const _handleChunk = Symbol('_handleChunk')
 | 
						|
const _makeBuf = Symbol('_makeBuf')
 | 
						|
const _mode = Symbol('_mode')
 | 
						|
const _needDrain = Symbol('_needDrain')
 | 
						|
const _onerror = Symbol('_onerror')
 | 
						|
const _onopen = Symbol('_onopen')
 | 
						|
const _onread = Symbol('_onread')
 | 
						|
const _onwrite = Symbol('_onwrite')
 | 
						|
const _open = Symbol('_open')
 | 
						|
const _path = Symbol('_path')
 | 
						|
const _pos = Symbol('_pos')
 | 
						|
const _queue = Symbol('_queue')
 | 
						|
const _read = Symbol('_read')
 | 
						|
const _readSize = Symbol('_readSize')
 | 
						|
const _reading = Symbol('_reading')
 | 
						|
const _remain = Symbol('_remain')
 | 
						|
const _size = Symbol('_size')
 | 
						|
const _write = Symbol('_write')
 | 
						|
const _writing = Symbol('_writing')
 | 
						|
const _defaultFlag = Symbol('_defaultFlag')
 | 
						|
const _errored = Symbol('_errored')
 | 
						|
 | 
						|
class ReadStream extends MiniPass {
 | 
						|
  constructor (path, opt) {
 | 
						|
    opt = opt || {}
 | 
						|
    super(opt)
 | 
						|
 | 
						|
    this.readable = true
 | 
						|
    this.writable = false
 | 
						|
 | 
						|
    if (typeof path !== 'string')
 | 
						|
      throw new TypeError('path must be a string')
 | 
						|
 | 
						|
    this[_errored] = false
 | 
						|
    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
 | 
						|
    this[_path] = path
 | 
						|
    this[_readSize] = opt.readSize || 16*1024*1024
 | 
						|
    this[_reading] = false
 | 
						|
    this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
 | 
						|
    this[_remain] = this[_size]
 | 
						|
    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
 | 
						|
      opt.autoClose : true
 | 
						|
 | 
						|
    if (typeof this[_fd] === 'number')
 | 
						|
      this[_read]()
 | 
						|
    else
 | 
						|
      this[_open]()
 | 
						|
  }
 | 
						|
 | 
						|
  get fd () { return this[_fd] }
 | 
						|
  get path () { return this[_path] }
 | 
						|
 | 
						|
  write () {
 | 
						|
    throw new TypeError('this is a readable stream')
 | 
						|
  }
 | 
						|
 | 
						|
  end () {
 | 
						|
    throw new TypeError('this is a readable stream')
 | 
						|
  }
 | 
						|
 | 
						|
  [_open] () {
 | 
						|
    fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
 | 
						|
  }
 | 
						|
 | 
						|
  [_onopen] (er, fd) {
 | 
						|
    if (er)
 | 
						|
      this[_onerror](er)
 | 
						|
    else {
 | 
						|
      this[_fd] = fd
 | 
						|
      this.emit('open', fd)
 | 
						|
      this[_read]()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_makeBuf] () {
 | 
						|
    return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
 | 
						|
  }
 | 
						|
 | 
						|
  [_read] () {
 | 
						|
    if (!this[_reading]) {
 | 
						|
      this[_reading] = true
 | 
						|
      const buf = this[_makeBuf]()
 | 
						|
      /* istanbul ignore if */
 | 
						|
      if (buf.length === 0)
 | 
						|
        return process.nextTick(() => this[_onread](null, 0, buf))
 | 
						|
      fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
 | 
						|
        this[_onread](er, br, buf))
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_onread] (er, br, buf) {
 | 
						|
    this[_reading] = false
 | 
						|
    if (er)
 | 
						|
      this[_onerror](er)
 | 
						|
    else if (this[_handleChunk](br, buf))
 | 
						|
      this[_read]()
 | 
						|
  }
 | 
						|
 | 
						|
  [_close] () {
 | 
						|
    if (this[_autoClose] && typeof this[_fd] === 'number') {
 | 
						|
      const fd = this[_fd]
 | 
						|
      this[_fd] = null
 | 
						|
      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_onerror] (er) {
 | 
						|
    this[_reading] = true
 | 
						|
    this[_close]()
 | 
						|
    this.emit('error', er)
 | 
						|
  }
 | 
						|
 | 
						|
  [_handleChunk] (br, buf) {
 | 
						|
    let ret = false
 | 
						|
    // no effect if infinite
 | 
						|
    this[_remain] -= br
 | 
						|
    if (br > 0)
 | 
						|
      ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
 | 
						|
 | 
						|
    if (br === 0 || this[_remain] <= 0) {
 | 
						|
      ret = false
 | 
						|
      this[_close]()
 | 
						|
      super.end()
 | 
						|
    }
 | 
						|
 | 
						|
    return ret
 | 
						|
  }
 | 
						|
 | 
						|
  emit (ev, data) {
 | 
						|
    switch (ev) {
 | 
						|
      case 'prefinish':
 | 
						|
      case 'finish':
 | 
						|
        break
 | 
						|
 | 
						|
      case 'drain':
 | 
						|
        if (typeof this[_fd] === 'number')
 | 
						|
          this[_read]()
 | 
						|
        break
 | 
						|
 | 
						|
      case 'error':
 | 
						|
        if (this[_errored])
 | 
						|
          return
 | 
						|
        this[_errored] = true
 | 
						|
        return super.emit(ev, data)
 | 
						|
 | 
						|
      default:
 | 
						|
        return super.emit(ev, data)
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
class ReadStreamSync extends ReadStream {
 | 
						|
  [_open] () {
 | 
						|
    let threw = true
 | 
						|
    try {
 | 
						|
      this[_onopen](null, fs.openSync(this[_path], 'r'))
 | 
						|
      threw = false
 | 
						|
    } finally {
 | 
						|
      if (threw)
 | 
						|
        this[_close]()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_read] () {
 | 
						|
    let threw = true
 | 
						|
    try {
 | 
						|
      if (!this[_reading]) {
 | 
						|
        this[_reading] = true
 | 
						|
        do {
 | 
						|
          const buf = this[_makeBuf]()
 | 
						|
          /* istanbul ignore next */
 | 
						|
          const br = buf.length === 0 ? 0
 | 
						|
            : fs.readSync(this[_fd], buf, 0, buf.length, null)
 | 
						|
          if (!this[_handleChunk](br, buf))
 | 
						|
            break
 | 
						|
        } while (true)
 | 
						|
        this[_reading] = false
 | 
						|
      }
 | 
						|
      threw = false
 | 
						|
    } finally {
 | 
						|
      if (threw)
 | 
						|
        this[_close]()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_close] () {
 | 
						|
    if (this[_autoClose] && typeof this[_fd] === 'number') {
 | 
						|
      const fd = this[_fd]
 | 
						|
      this[_fd] = null
 | 
						|
      fs.closeSync(fd)
 | 
						|
      this.emit('close')
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
class WriteStream extends EE {
 | 
						|
  constructor (path, opt) {
 | 
						|
    opt = opt || {}
 | 
						|
    super(opt)
 | 
						|
    this.readable = false
 | 
						|
    this.writable = true
 | 
						|
    this[_errored] = false
 | 
						|
    this[_writing] = false
 | 
						|
    this[_ended] = false
 | 
						|
    this[_needDrain] = false
 | 
						|
    this[_queue] = []
 | 
						|
    this[_path] = path
 | 
						|
    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
 | 
						|
    this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
 | 
						|
    this[_pos] = typeof opt.start === 'number' ? opt.start : null
 | 
						|
    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
 | 
						|
      opt.autoClose : true
 | 
						|
 | 
						|
    // truncating makes no sense when writing into the middle
 | 
						|
    const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
 | 
						|
    this[_defaultFlag] = opt.flags === undefined
 | 
						|
    this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
 | 
						|
 | 
						|
    if (this[_fd] === null)
 | 
						|
      this[_open]()
 | 
						|
  }
 | 
						|
 | 
						|
  emit (ev, data) {
 | 
						|
    if (ev === 'error') {
 | 
						|
      if (this[_errored])
 | 
						|
        return
 | 
						|
      this[_errored] = true
 | 
						|
    }
 | 
						|
    return super.emit(ev, data)
 | 
						|
  }
 | 
						|
 | 
						|
 | 
						|
  get fd () { return this[_fd] }
 | 
						|
  get path () { return this[_path] }
 | 
						|
 | 
						|
  [_onerror] (er) {
 | 
						|
    this[_close]()
 | 
						|
    this[_writing] = true
 | 
						|
    this.emit('error', er)
 | 
						|
  }
 | 
						|
 | 
						|
  [_open] () {
 | 
						|
    fs.open(this[_path], this[_flags], this[_mode],
 | 
						|
      (er, fd) => this[_onopen](er, fd))
 | 
						|
  }
 | 
						|
 | 
						|
  [_onopen] (er, fd) {
 | 
						|
    if (this[_defaultFlag] &&
 | 
						|
        this[_flags] === 'r+' &&
 | 
						|
        er && er.code === 'ENOENT') {
 | 
						|
      this[_flags] = 'w'
 | 
						|
      this[_open]()
 | 
						|
    } else if (er)
 | 
						|
      this[_onerror](er)
 | 
						|
    else {
 | 
						|
      this[_fd] = fd
 | 
						|
      this.emit('open', fd)
 | 
						|
      this[_flush]()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  end (buf, enc) {
 | 
						|
    if (buf)
 | 
						|
      this.write(buf, enc)
 | 
						|
 | 
						|
    this[_ended] = true
 | 
						|
 | 
						|
    // synthetic after-write logic, where drain/finish live
 | 
						|
    if (!this[_writing] && !this[_queue].length &&
 | 
						|
        typeof this[_fd] === 'number')
 | 
						|
      this[_onwrite](null, 0)
 | 
						|
    return this
 | 
						|
  }
 | 
						|
 | 
						|
  write (buf, enc) {
 | 
						|
    if (typeof buf === 'string')
 | 
						|
      buf = Buffer.from(buf, enc)
 | 
						|
 | 
						|
    if (this[_ended]) {
 | 
						|
      this.emit('error', new Error('write() after end()'))
 | 
						|
      return false
 | 
						|
    }
 | 
						|
 | 
						|
    if (this[_fd] === null || this[_writing] || this[_queue].length) {
 | 
						|
      this[_queue].push(buf)
 | 
						|
      this[_needDrain] = true
 | 
						|
      return false
 | 
						|
    }
 | 
						|
 | 
						|
    this[_writing] = true
 | 
						|
    this[_write](buf)
 | 
						|
    return true
 | 
						|
  }
 | 
						|
 | 
						|
  [_write] (buf) {
 | 
						|
    fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
 | 
						|
      this[_onwrite](er, bw))
 | 
						|
  }
 | 
						|
 | 
						|
  [_onwrite] (er, bw) {
 | 
						|
    if (er)
 | 
						|
      this[_onerror](er)
 | 
						|
    else {
 | 
						|
      if (this[_pos] !== null)
 | 
						|
        this[_pos] += bw
 | 
						|
      if (this[_queue].length)
 | 
						|
        this[_flush]()
 | 
						|
      else {
 | 
						|
        this[_writing] = false
 | 
						|
 | 
						|
        if (this[_ended] && !this[_finished]) {
 | 
						|
          this[_finished] = true
 | 
						|
          this[_close]()
 | 
						|
          this.emit('finish')
 | 
						|
        } else if (this[_needDrain]) {
 | 
						|
          this[_needDrain] = false
 | 
						|
          this.emit('drain')
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_flush] () {
 | 
						|
    if (this[_queue].length === 0) {
 | 
						|
      if (this[_ended])
 | 
						|
        this[_onwrite](null, 0)
 | 
						|
    } else if (this[_queue].length === 1)
 | 
						|
      this[_write](this[_queue].pop())
 | 
						|
    else {
 | 
						|
      const iovec = this[_queue]
 | 
						|
      this[_queue] = []
 | 
						|
      writev(this[_fd], iovec, this[_pos],
 | 
						|
        (er, bw) => this[_onwrite](er, bw))
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_close] () {
 | 
						|
    if (this[_autoClose] && typeof this[_fd] === 'number') {
 | 
						|
      const fd = this[_fd]
 | 
						|
      this[_fd] = null
 | 
						|
      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
class WriteStreamSync extends WriteStream {
 | 
						|
  [_open] () {
 | 
						|
    let fd
 | 
						|
    // only wrap in a try{} block if we know we'll retry, to avoid
 | 
						|
    // the rethrow obscuring the error's source frame in most cases.
 | 
						|
    if (this[_defaultFlag] && this[_flags] === 'r+') {
 | 
						|
      try {
 | 
						|
        fd = fs.openSync(this[_path], this[_flags], this[_mode])
 | 
						|
      } catch (er) {
 | 
						|
        if (er.code === 'ENOENT') {
 | 
						|
          this[_flags] = 'w'
 | 
						|
          return this[_open]()
 | 
						|
        } else
 | 
						|
          throw er
 | 
						|
      }
 | 
						|
    } else
 | 
						|
      fd = fs.openSync(this[_path], this[_flags], this[_mode])
 | 
						|
 | 
						|
    this[_onopen](null, fd)
 | 
						|
  }
 | 
						|
 | 
						|
  [_close] () {
 | 
						|
    if (this[_autoClose] && typeof this[_fd] === 'number') {
 | 
						|
      const fd = this[_fd]
 | 
						|
      this[_fd] = null
 | 
						|
      fs.closeSync(fd)
 | 
						|
      this.emit('close')
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  [_write] (buf) {
 | 
						|
    // throw the original, but try to close if it fails
 | 
						|
    let threw = true
 | 
						|
    try {
 | 
						|
      this[_onwrite](null,
 | 
						|
        fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
 | 
						|
      threw = false
 | 
						|
    } finally {
 | 
						|
      if (threw)
 | 
						|
        try { this[_close]() } catch (_) {}
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
exports.ReadStream = ReadStream
 | 
						|
exports.ReadStreamSync = ReadStreamSync
 | 
						|
 | 
						|
exports.WriteStream = WriteStream
 | 
						|
exports.WriteStreamSync = WriteStreamSync
 |