You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					528 lines
				
				13 KiB
			
		
		
			
		
	
	
					528 lines
				
				13 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								'use strict'
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const { version } = require('./package.json')
							 | 
						||
| 
								 | 
							
								const { EventEmitter } = require('events')
							 | 
						||
| 
								 | 
							
								const { Worker } = require('worker_threads')
							 | 
						||
| 
								 | 
							
								const { join } = require('path')
							 | 
						||
| 
								 | 
							
								const { pathToFileURL } = require('url')
							 | 
						||
| 
								 | 
							
								const { wait } = require('./lib/wait')
							 | 
						||
| 
								 | 
							
								const {
							 | 
						||
| 
								 | 
							
								  WRITE_INDEX,
							 | 
						||
| 
								 | 
							
								  READ_INDEX
							 | 
						||
| 
								 | 
							
								} = require('./lib/indexes')
							 | 
						||
| 
								 | 
							
								const buffer = require('buffer')
							 | 
						||
| 
								 | 
							
								const assert = require('assert')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const kImpl = Symbol('kImpl')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// V8 limit for string size
							 | 
						||
| 
								 | 
							
								const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class FakeWeakRef {
							 | 
						||
| 
								 | 
							
								  constructor (value) {
							 | 
						||
| 
								 | 
							
								    this._value = value
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  deref () {
							 | 
						||
| 
								 | 
							
								    return this._value
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry {
							 | 
						||
| 
								 | 
							
								  register () {}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  unregister () {}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const WeakRef = global.WeakRef || FakeWeakRef
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const registry = new FinalizationRegistry((worker) => {
							 | 
						||
| 
								 | 
							
								  if (worker.exited) {
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  worker.terminate()
							 | 
						||
| 
								 | 
							
								})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function createWorker (stream, opts) {
							 | 
						||
| 
								 | 
							
								  const { filename, workerData } = opts
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {}
							 | 
						||
| 
								 | 
							
								  const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  const worker = new Worker(toExecute, {
							 | 
						||
| 
								 | 
							
								    ...opts.workerOpts,
							 | 
						||
| 
								 | 
							
								    workerData: {
							 | 
						||
| 
								 | 
							
								      filename: filename.indexOf('file://') === 0
							 | 
						||
| 
								 | 
							
								        ? filename
							 | 
						||
| 
								 | 
							
								        : pathToFileURL(filename).href,
							 | 
						||
| 
								 | 
							
								      dataBuf: stream[kImpl].dataBuf,
							 | 
						||
| 
								 | 
							
								      stateBuf: stream[kImpl].stateBuf,
							 | 
						||
| 
								 | 
							
								      workerData: {
							 | 
						||
| 
								 | 
							
								        $context: {
							 | 
						||
| 
								 | 
							
								          threadStreamVersion: version
							 | 
						||
| 
								 | 
							
								        },
							 | 
						||
| 
								 | 
							
								        ...workerData
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  })
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // We keep a strong reference for now,
							 | 
						||
| 
								 | 
							
								  // we need to start writing first
							 | 
						||
| 
								 | 
							
								  worker.stream = new FakeWeakRef(stream)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  worker.on('message', onWorkerMessage)
							 | 
						||
| 
								 | 
							
								  worker.on('exit', onWorkerExit)
							 | 
						||
| 
								 | 
							
								  registry.register(stream, worker)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return worker
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function drain (stream) {
							 | 
						||
| 
								 | 
							
								  assert(!stream[kImpl].sync)
							 | 
						||
| 
								 | 
							
								  if (stream[kImpl].needDrain) {
							 | 
						||
| 
								 | 
							
								    stream[kImpl].needDrain = false
							 | 
						||
| 
								 | 
							
								    stream.emit('drain')
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function nextFlush (stream) {
							 | 
						||
| 
								 | 
							
								  const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								  let leftover = stream[kImpl].data.length - writeIndex
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (leftover > 0) {
							 | 
						||
| 
								 | 
							
								    if (stream[kImpl].buf.length === 0) {
							 | 
						||
| 
								 | 
							
								      stream[kImpl].flushing = false
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      if (stream[kImpl].ending) {
							 | 
						||
| 
								 | 
							
								        end(stream)
							 | 
						||
| 
								 | 
							
								      } else if (stream[kImpl].needDrain) {
							 | 
						||
| 
								 | 
							
								        process.nextTick(drain, stream)
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      return
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    let toWrite = stream[kImpl].buf.slice(0, leftover)
							 | 
						||
| 
								 | 
							
								    let toWriteBytes = Buffer.byteLength(toWrite)
							 | 
						||
| 
								 | 
							
								    if (toWriteBytes <= leftover) {
							 | 
						||
| 
								 | 
							
								      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
							 | 
						||
| 
								 | 
							
								      // process._rawDebug('writing ' + toWrite.length)
							 | 
						||
| 
								 | 
							
								      write(stream, toWrite, nextFlush.bind(null, stream))
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      // multi-byte utf-8
							 | 
						||
| 
								 | 
							
								      stream.flush(() => {
							 | 
						||
| 
								 | 
							
								        // err is already handled in flush()
							 | 
						||
| 
								 | 
							
								        if (stream.destroyed) {
							 | 
						||
| 
								 | 
							
								          return
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        Atomics.store(stream[kImpl].state, READ_INDEX, 0)
							 | 
						||
| 
								 | 
							
								        Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // Find a toWrite length that fits the buffer
							 | 
						||
| 
								 | 
							
								        // it must exists as the buffer is at least 4 bytes length
							 | 
						||
| 
								 | 
							
								        // and the max utf-8 length for a char is 4 bytes.
							 | 
						||
| 
								 | 
							
								        while (toWriteBytes > stream[kImpl].data.length) {
							 | 
						||
| 
								 | 
							
								          leftover = leftover / 2
							 | 
						||
| 
								 | 
							
								          toWrite = stream[kImpl].buf.slice(0, leftover)
							 | 
						||
| 
								 | 
							
								          toWriteBytes = Buffer.byteLength(toWrite)
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
							 | 
						||
| 
								 | 
							
								        write(stream, toWrite, nextFlush.bind(null, stream))
							 | 
						||
| 
								 | 
							
								      })
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  } else if (leftover === 0) {
							 | 
						||
| 
								 | 
							
								    if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
							 | 
						||
| 
								 | 
							
								      // we had a flushSync in the meanwhile
							 | 
						||
| 
								 | 
							
								      return
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    stream.flush(() => {
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
							 | 
						||
| 
								 | 
							
								      nextFlush(stream)
							 | 
						||
| 
								 | 
							
								    })
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    // This should never happen
							 | 
						||
| 
								 | 
							
								    destroy(stream, new Error('overwritten'))
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function onWorkerMessage (msg) {
							 | 
						||
| 
								 | 
							
								  const stream = this.stream.deref()
							 | 
						||
| 
								 | 
							
								  if (stream === undefined) {
							 | 
						||
| 
								 | 
							
								    this.exited = true
							 | 
						||
| 
								 | 
							
								    // Terminate the worker.
							 | 
						||
| 
								 | 
							
								    this.terminate()
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  switch (msg.code) {
							 | 
						||
| 
								 | 
							
								    case 'READY':
							 | 
						||
| 
								 | 
							
								      // Replace the FakeWeakRef with a
							 | 
						||
| 
								 | 
							
								      // proper one.
							 | 
						||
| 
								 | 
							
								      this.stream = new WeakRef(stream)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      stream.flush(() => {
							 | 
						||
| 
								 | 
							
								        stream[kImpl].ready = true
							 | 
						||
| 
								 | 
							
								        stream.emit('ready')
							 | 
						||
| 
								 | 
							
								      })
							 | 
						||
| 
								 | 
							
								      break
							 | 
						||
| 
								 | 
							
								    case 'ERROR':
							 | 
						||
| 
								 | 
							
								      destroy(stream, msg.err)
							 | 
						||
| 
								 | 
							
								      break
							 | 
						||
| 
								 | 
							
								    case 'EVENT':
							 | 
						||
| 
								 | 
							
								      if (Array.isArray(msg.args)) {
							 | 
						||
| 
								 | 
							
								        stream.emit(msg.name, ...msg.args)
							 | 
						||
| 
								 | 
							
								      } else {
							 | 
						||
| 
								 | 
							
								        stream.emit(msg.name, msg.args)
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      break
							 | 
						||
| 
								 | 
							
								    default:
							 | 
						||
| 
								 | 
							
								      destroy(stream, new Error('this should not happen: ' + msg.code))
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function onWorkerExit (code) {
							 | 
						||
| 
								 | 
							
								  const stream = this.stream.deref()
							 | 
						||
| 
								 | 
							
								  if (stream === undefined) {
							 | 
						||
| 
								 | 
							
								    // Nothing to do, the worker already exit
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  registry.unregister(stream)
							 | 
						||
| 
								 | 
							
								  stream.worker.exited = true
							 | 
						||
| 
								 | 
							
								  stream.worker.off('exit', onWorkerExit)
							 | 
						||
| 
								 | 
							
								  destroy(stream, code !== 0 ? new Error('the worker thread exited') : null)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class ThreadStream extends EventEmitter {
							 | 
						||
| 
								 | 
							
								  constructor (opts = {}) {
							 | 
						||
| 
								 | 
							
								    super()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (opts.bufferSize < 4) {
							 | 
						||
| 
								 | 
							
								      throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    this[kImpl] = {}
							 | 
						||
| 
								 | 
							
								    this[kImpl].stateBuf = new SharedArrayBuffer(128)
							 | 
						||
| 
								 | 
							
								    this[kImpl].state = new Int32Array(this[kImpl].stateBuf)
							 | 
						||
| 
								 | 
							
								    this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024)
							 | 
						||
| 
								 | 
							
								    this[kImpl].data = Buffer.from(this[kImpl].dataBuf)
							 | 
						||
| 
								 | 
							
								    this[kImpl].sync = opts.sync || false
							 | 
						||
| 
								 | 
							
								    this[kImpl].ending = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].ended = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].needDrain = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].destroyed = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].flushing = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].ready = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].finished = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].errored = null
							 | 
						||
| 
								 | 
							
								    this[kImpl].closed = false
							 | 
						||
| 
								 | 
							
								    this[kImpl].buf = ''
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // TODO (fix): Make private?
							 | 
						||
| 
								 | 
							
								    this.worker = createWorker(this, opts) // TODO (fix): make private
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  write (data) {
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].destroyed) {
							 | 
						||
| 
								 | 
							
								      error(this, new Error('the worker has exited'))
							 | 
						||
| 
								 | 
							
								      return false
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].ending) {
							 | 
						||
| 
								 | 
							
								      error(this, new Error('the worker is ending'))
							 | 
						||
| 
								 | 
							
								      return false
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
							 | 
						||
| 
								 | 
							
								      try {
							 | 
						||
| 
								 | 
							
								        writeSync(this)
							 | 
						||
| 
								 | 
							
								        this[kImpl].flushing = true
							 | 
						||
| 
								 | 
							
								      } catch (err) {
							 | 
						||
| 
								 | 
							
								        destroy(this, err)
							 | 
						||
| 
								 | 
							
								        return false
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    this[kImpl].buf += data
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].sync) {
							 | 
						||
| 
								 | 
							
								      try {
							 | 
						||
| 
								 | 
							
								        writeSync(this)
							 | 
						||
| 
								 | 
							
								        return true
							 | 
						||
| 
								 | 
							
								      } catch (err) {
							 | 
						||
| 
								 | 
							
								        destroy(this, err)
							 | 
						||
| 
								 | 
							
								        return false
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (!this[kImpl].flushing) {
							 | 
						||
| 
								 | 
							
								      this[kImpl].flushing = true
							 | 
						||
| 
								 | 
							
								      setImmediate(nextFlush, this)
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
							 | 
						||
| 
								 | 
							
								    return !this[kImpl].needDrain
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  end () {
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].destroyed) {
							 | 
						||
| 
								 | 
							
								      return
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    this[kImpl].ending = true
							 | 
						||
| 
								 | 
							
								    end(this)
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  flush (cb) {
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].destroyed) {
							 | 
						||
| 
								 | 
							
								      if (typeof cb === 'function') {
							 | 
						||
| 
								 | 
							
								        process.nextTick(cb, new Error('the worker has exited'))
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      return
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // TODO write all .buf
							 | 
						||
| 
								 | 
							
								    const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								    // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
							 | 
						||
| 
								 | 
							
								    wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
							 | 
						||
| 
								 | 
							
								      if (err) {
							 | 
						||
| 
								 | 
							
								        destroy(this, err)
							 | 
						||
| 
								 | 
							
								        process.nextTick(cb, err)
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      if (res === 'not-equal') {
							 | 
						||
| 
								 | 
							
								        // TODO handle deadlock
							 | 
						||
| 
								 | 
							
								        this.flush(cb)
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      process.nextTick(cb)
							 | 
						||
| 
								 | 
							
								    })
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  flushSync () {
							 | 
						||
| 
								 | 
							
								    if (this[kImpl].destroyed) {
							 | 
						||
| 
								 | 
							
								      return
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    writeSync(this)
							 | 
						||
| 
								 | 
							
								    flushSync(this)
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  unref () {
							 | 
						||
| 
								 | 
							
								    this.worker.unref()
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  ref () {
							 | 
						||
| 
								 | 
							
								    this.worker.ref()
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get ready () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].ready
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get destroyed () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].destroyed
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get closed () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].closed
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writable () {
							 | 
						||
| 
								 | 
							
								    return !this[kImpl].destroyed && !this[kImpl].ending
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writableEnded () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].ending
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writableFinished () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].finished
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writableNeedDrain () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].needDrain
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writableObjectMode () {
							 | 
						||
| 
								 | 
							
								    return false
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  get writableErrored () {
							 | 
						||
| 
								 | 
							
								    return this[kImpl].errored
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function error (stream, err) {
							 | 
						||
| 
								 | 
							
								  setImmediate(() => {
							 | 
						||
| 
								 | 
							
								    stream.emit('error', err)
							 | 
						||
| 
								 | 
							
								  })
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function destroy (stream, err) {
							 | 
						||
| 
								 | 
							
								  if (stream[kImpl].destroyed) {
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  stream[kImpl].destroyed = true
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (err) {
							 | 
						||
| 
								 | 
							
								    stream[kImpl].errored = err
							 | 
						||
| 
								 | 
							
								    error(stream, err)
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (!stream.worker.exited) {
							 | 
						||
| 
								 | 
							
								    stream.worker.terminate()
							 | 
						||
| 
								 | 
							
								      .catch(() => {})
							 | 
						||
| 
								 | 
							
								      .then(() => {
							 | 
						||
| 
								 | 
							
								        stream[kImpl].closed = true
							 | 
						||
| 
								 | 
							
								        stream.emit('close')
							 | 
						||
| 
								 | 
							
								      })
							 | 
						||
| 
								 | 
							
								  } else {
							 | 
						||
| 
								 | 
							
								    setImmediate(() => {
							 | 
						||
| 
								 | 
							
								      stream[kImpl].closed = true
							 | 
						||
| 
								 | 
							
								      stream.emit('close')
							 | 
						||
| 
								 | 
							
								    })
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function write (stream, data, cb) {
							 | 
						||
| 
								 | 
							
								  // data is smaller than the shared buffer length
							 | 
						||
| 
								 | 
							
								  const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								  const length = Buffer.byteLength(data)
							 | 
						||
| 
								 | 
							
								  stream[kImpl].data.write(data, current)
							 | 
						||
| 
								 | 
							
								  Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
							 | 
						||
| 
								 | 
							
								  Atomics.notify(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								  cb()
							 | 
						||
| 
								 | 
							
								  return true
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function end (stream) {
							 | 
						||
| 
								 | 
							
								  if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) {
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  stream[kImpl].ended = true
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  try {
							 | 
						||
| 
								 | 
							
								    stream.flushSync()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // process._rawDebug('writing index')
							 | 
						||
| 
								 | 
							
								    Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
							 | 
						||
| 
								 | 
							
								    // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
							 | 
						||
| 
								 | 
							
								    Atomics.notify(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // Wait for the process to complete
							 | 
						||
| 
								 | 
							
								    let spins = 0
							 | 
						||
| 
								 | 
							
								    while (readIndex !== -1) {
							 | 
						||
| 
								 | 
							
								      // process._rawDebug(`read = ${read}`)
							 | 
						||
| 
								 | 
							
								      Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
							 | 
						||
| 
								 | 
							
								      readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      if (readIndex === -2) {
							 | 
						||
| 
								 | 
							
								        destroy(stream, new Error('end() failed'))
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      if (++spins === 10) {
							 | 
						||
| 
								 | 
							
								        destroy(stream, new Error('end() took too long (10s)'))
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    process.nextTick(() => {
							 | 
						||
| 
								 | 
							
								      stream[kImpl].finished = true
							 | 
						||
| 
								 | 
							
								      stream.emit('finish')
							 | 
						||
| 
								 | 
							
								    })
							 | 
						||
| 
								 | 
							
								  } catch (err) {
							 | 
						||
| 
								 | 
							
								    destroy(stream, err)
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  // process._rawDebug('end finished...')
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function writeSync (stream) {
							 | 
						||
| 
								 | 
							
								  const cb = () => {
							 | 
						||
| 
								 | 
							
								    if (stream[kImpl].ending) {
							 | 
						||
| 
								 | 
							
								      end(stream)
							 | 
						||
| 
								 | 
							
								    } else if (stream[kImpl].needDrain) {
							 | 
						||
| 
								 | 
							
								      process.nextTick(drain, stream)
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  stream[kImpl].flushing = false
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  while (stream[kImpl].buf.length !== 0) {
							 | 
						||
| 
								 | 
							
								    const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								    let leftover = stream[kImpl].data.length - writeIndex
							 | 
						||
| 
								 | 
							
								    if (leftover === 0) {
							 | 
						||
| 
								 | 
							
								      flushSync(stream)
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
							 | 
						||
| 
								 | 
							
								      continue
							 | 
						||
| 
								 | 
							
								    } else if (leftover < 0) {
							 | 
						||
| 
								 | 
							
								      // stream should never happen
							 | 
						||
| 
								 | 
							
								      throw new Error('overwritten')
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    let toWrite = stream[kImpl].buf.slice(0, leftover)
							 | 
						||
| 
								 | 
							
								    let toWriteBytes = Buffer.byteLength(toWrite)
							 | 
						||
| 
								 | 
							
								    if (toWriteBytes <= leftover) {
							 | 
						||
| 
								 | 
							
								      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
							 | 
						||
| 
								 | 
							
								      // process._rawDebug('writing ' + toWrite.length)
							 | 
						||
| 
								 | 
							
								      write(stream, toWrite, cb)
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      // multi-byte utf-8
							 | 
						||
| 
								 | 
							
								      flushSync(stream)
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
							 | 
						||
| 
								 | 
							
								      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      // Find a toWrite length that fits the buffer
							 | 
						||
| 
								 | 
							
								      // it must exists as the buffer is at least 4 bytes length
							 | 
						||
| 
								 | 
							
								      // and the max utf-8 length for a char is 4 bytes.
							 | 
						||
| 
								 | 
							
								      while (toWriteBytes > stream[kImpl].buf.length) {
							 | 
						||
| 
								 | 
							
								        leftover = leftover / 2
							 | 
						||
| 
								 | 
							
								        toWrite = stream[kImpl].buf.slice(0, leftover)
							 | 
						||
| 
								 | 
							
								        toWriteBytes = Buffer.byteLength(toWrite)
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
							 | 
						||
| 
								 | 
							
								      write(stream, toWrite, cb)
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function flushSync (stream) {
							 | 
						||
| 
								 | 
							
								  if (stream[kImpl].flushing) {
							 | 
						||
| 
								 | 
							
								    throw new Error('unable to flush while flushing')
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // process._rawDebug('flushSync started')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  let spins = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // TODO handle deadlock
							 | 
						||
| 
								 | 
							
								  while (true) {
							 | 
						||
| 
								 | 
							
								    const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (readIndex === -2) {
							 | 
						||
| 
								 | 
							
								      throw Error('_flushSync failed')
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
							 | 
						||
| 
								 | 
							
								    if (readIndex !== writeIndex) {
							 | 
						||
| 
								 | 
							
								      // TODO stream timeouts for some reason.
							 | 
						||
| 
								 | 
							
								      Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      break
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (++spins === 10) {
							 | 
						||
| 
								 | 
							
								      throw new Error('_flushSync took too long (10s)')
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  // process._rawDebug('flushSync finished')
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								module.exports = ThreadStream
							 |