You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					528 lines
				
				13 KiB
			
		
		
			
		
	
	
					528 lines
				
				13 KiB
			| 
											3 years ago
										 | 'use strict' | ||
|  | 
 | ||
|  | const { version } = require('./package.json') | ||
|  | const { EventEmitter } = require('events') | ||
|  | const { Worker } = require('worker_threads') | ||
|  | const { join } = require('path') | ||
|  | const { pathToFileURL } = require('url') | ||
|  | const { wait } = require('./lib/wait') | ||
|  | const { | ||
|  |   WRITE_INDEX, | ||
|  |   READ_INDEX | ||
|  | } = require('./lib/indexes') | ||
|  | const buffer = require('buffer') | ||
|  | const assert = require('assert') | ||
|  | 
 | ||
|  | const kImpl = Symbol('kImpl') | ||
|  | 
 | ||
|  | // V8 limit for string size
 | ||
|  | const MAX_STRING = buffer.constants.MAX_STRING_LENGTH | ||
|  | 
 | ||
|  | class FakeWeakRef { | ||
|  |   constructor (value) { | ||
|  |     this._value = value | ||
|  |   } | ||
|  | 
 | ||
|  |   deref () { | ||
|  |     return this._value | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry { | ||
|  |   register () {} | ||
|  | 
 | ||
|  |   unregister () {} | ||
|  | } | ||
|  | 
 | ||
|  | const WeakRef = global.WeakRef || FakeWeakRef | ||
|  | 
 | ||
|  | const registry = new FinalizationRegistry((worker) => { | ||
|  |   if (worker.exited) { | ||
|  |     return | ||
|  |   } | ||
|  |   worker.terminate() | ||
|  | }) | ||
|  | 
 | ||
|  | function createWorker (stream, opts) { | ||
|  |   const { filename, workerData } = opts | ||
|  | 
 | ||
|  |   const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {} | ||
|  |   const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js') | ||
|  | 
 | ||
|  |   const worker = new Worker(toExecute, { | ||
|  |     ...opts.workerOpts, | ||
|  |     workerData: { | ||
|  |       filename: filename.indexOf('file://') === 0 | ||
|  |         ? filename | ||
|  |         : pathToFileURL(filename).href, | ||
|  |       dataBuf: stream[kImpl].dataBuf, | ||
|  |       stateBuf: stream[kImpl].stateBuf, | ||
|  |       workerData: { | ||
|  |         $context: { | ||
|  |           threadStreamVersion: version | ||
|  |         }, | ||
|  |         ...workerData | ||
|  |       } | ||
|  |     } | ||
|  |   }) | ||
|  | 
 | ||
|  |   // We keep a strong reference for now,
 | ||
|  |   // we need to start writing first
 | ||
|  |   worker.stream = new FakeWeakRef(stream) | ||
|  | 
 | ||
|  |   worker.on('message', onWorkerMessage) | ||
|  |   worker.on('exit', onWorkerExit) | ||
|  |   registry.register(stream, worker) | ||
|  | 
 | ||
|  |   return worker | ||
|  | } | ||
|  | 
 | ||
|  | function drain (stream) { | ||
|  |   assert(!stream[kImpl].sync) | ||
|  |   if (stream[kImpl].needDrain) { | ||
|  |     stream[kImpl].needDrain = false | ||
|  |     stream.emit('drain') | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function nextFlush (stream) { | ||
|  |   const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
|  |   let leftover = stream[kImpl].data.length - writeIndex | ||
|  | 
 | ||
|  |   if (leftover > 0) { | ||
|  |     if (stream[kImpl].buf.length === 0) { | ||
|  |       stream[kImpl].flushing = false | ||
|  | 
 | ||
|  |       if (stream[kImpl].ending) { | ||
|  |         end(stream) | ||
|  |       } else if (stream[kImpl].needDrain) { | ||
|  |         process.nextTick(drain, stream) | ||
|  |       } | ||
|  | 
 | ||
|  |       return | ||
|  |     } | ||
|  | 
 | ||
|  |     let toWrite = stream[kImpl].buf.slice(0, leftover) | ||
|  |     let toWriteBytes = Buffer.byteLength(toWrite) | ||
|  |     if (toWriteBytes <= leftover) { | ||
|  |       stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
|  |       // process._rawDebug('writing ' + toWrite.length)
 | ||
|  |       write(stream, toWrite, nextFlush.bind(null, stream)) | ||
|  |     } else { | ||
|  |       // multi-byte utf-8
 | ||
|  |       stream.flush(() => { | ||
|  |         // err is already handled in flush()
 | ||
|  |         if (stream.destroyed) { | ||
|  |           return | ||
|  |         } | ||
|  | 
 | ||
|  |         Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
|  |         Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
|  | 
 | ||
|  |         // Find a toWrite length that fits the buffer
 | ||
|  |         // it must exists as the buffer is at least 4 bytes length
 | ||
|  |         // and the max utf-8 length for a char is 4 bytes.
 | ||
|  |         while (toWriteBytes > stream[kImpl].data.length) { | ||
|  |           leftover = leftover / 2 | ||
|  |           toWrite = stream[kImpl].buf.slice(0, leftover) | ||
|  |           toWriteBytes = Buffer.byteLength(toWrite) | ||
|  |         } | ||
|  |         stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
|  |         write(stream, toWrite, nextFlush.bind(null, stream)) | ||
|  |       }) | ||
|  |     } | ||
|  |   } else if (leftover === 0) { | ||
|  |     if (writeIndex === 0 && stream[kImpl].buf.length === 0) { | ||
|  |       // we had a flushSync in the meanwhile
 | ||
|  |       return | ||
|  |     } | ||
|  |     stream.flush(() => { | ||
|  |       Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
|  |       Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
|  |       nextFlush(stream) | ||
|  |     }) | ||
|  |   } else { | ||
|  |     // This should never happen
 | ||
|  |     destroy(stream, new Error('overwritten')) | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function onWorkerMessage (msg) { | ||
|  |   const stream = this.stream.deref() | ||
|  |   if (stream === undefined) { | ||
|  |     this.exited = true | ||
|  |     // Terminate the worker.
 | ||
|  |     this.terminate() | ||
|  |     return | ||
|  |   } | ||
|  | 
 | ||
|  |   switch (msg.code) { | ||
|  |     case 'READY': | ||
|  |       // Replace the FakeWeakRef with a
 | ||
|  |       // proper one.
 | ||
|  |       this.stream = new WeakRef(stream) | ||
|  | 
 | ||
|  |       stream.flush(() => { | ||
|  |         stream[kImpl].ready = true | ||
|  |         stream.emit('ready') | ||
|  |       }) | ||
|  |       break | ||
|  |     case 'ERROR': | ||
|  |       destroy(stream, msg.err) | ||
|  |       break | ||
|  |     case 'EVENT': | ||
|  |       if (Array.isArray(msg.args)) { | ||
|  |         stream.emit(msg.name, ...msg.args) | ||
|  |       } else { | ||
|  |         stream.emit(msg.name, msg.args) | ||
|  |       } | ||
|  |       break | ||
|  |     default: | ||
|  |       destroy(stream, new Error('this should not happen: ' + msg.code)) | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function onWorkerExit (code) { | ||
|  |   const stream = this.stream.deref() | ||
|  |   if (stream === undefined) { | ||
|  |     // Nothing to do, the worker already exit
 | ||
|  |     return | ||
|  |   } | ||
|  |   registry.unregister(stream) | ||
|  |   stream.worker.exited = true | ||
|  |   stream.worker.off('exit', onWorkerExit) | ||
|  |   destroy(stream, code !== 0 ? new Error('the worker thread exited') : null) | ||
|  | } | ||
|  | 
 | ||
|  | class ThreadStream extends EventEmitter { | ||
|  |   constructor (opts = {}) { | ||
|  |     super() | ||
|  | 
 | ||
|  |     if (opts.bufferSize < 4) { | ||
|  |       throw new Error('bufferSize must at least fit a 4-byte utf-8 char') | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kImpl] = {} | ||
|  |     this[kImpl].stateBuf = new SharedArrayBuffer(128) | ||
|  |     this[kImpl].state = new Int32Array(this[kImpl].stateBuf) | ||
|  |     this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024) | ||
|  |     this[kImpl].data = Buffer.from(this[kImpl].dataBuf) | ||
|  |     this[kImpl].sync = opts.sync || false | ||
|  |     this[kImpl].ending = false | ||
|  |     this[kImpl].ended = false | ||
|  |     this[kImpl].needDrain = false | ||
|  |     this[kImpl].destroyed = false | ||
|  |     this[kImpl].flushing = false | ||
|  |     this[kImpl].ready = false | ||
|  |     this[kImpl].finished = false | ||
|  |     this[kImpl].errored = null | ||
|  |     this[kImpl].closed = false | ||
|  |     this[kImpl].buf = '' | ||
|  | 
 | ||
|  |     // TODO (fix): Make private?
 | ||
|  |     this.worker = createWorker(this, opts) // TODO (fix): make private
 | ||
|  |   } | ||
|  | 
 | ||
|  |   write (data) { | ||
|  |     if (this[kImpl].destroyed) { | ||
|  |       error(this, new Error('the worker has exited')) | ||
|  |       return false | ||
|  |     } | ||
|  | 
 | ||
|  |     if (this[kImpl].ending) { | ||
|  |       error(this, new Error('the worker is ending')) | ||
|  |       return false | ||
|  |     } | ||
|  | 
 | ||
|  |     if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { | ||
|  |       try { | ||
|  |         writeSync(this) | ||
|  |         this[kImpl].flushing = true | ||
|  |       } catch (err) { | ||
|  |         destroy(this, err) | ||
|  |         return false | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kImpl].buf += data | ||
|  | 
 | ||
|  |     if (this[kImpl].sync) { | ||
|  |       try { | ||
|  |         writeSync(this) | ||
|  |         return true | ||
|  |       } catch (err) { | ||
|  |         destroy(this, err) | ||
|  |         return false | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     if (!this[kImpl].flushing) { | ||
|  |       this[kImpl].flushing = true | ||
|  |       setImmediate(nextFlush, this) | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0 | ||
|  |     return !this[kImpl].needDrain | ||
|  |   } | ||
|  | 
 | ||
|  |   end () { | ||
|  |     if (this[kImpl].destroyed) { | ||
|  |       return | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kImpl].ending = true | ||
|  |     end(this) | ||
|  |   } | ||
|  | 
 | ||
|  |   flush (cb) { | ||
|  |     if (this[kImpl].destroyed) { | ||
|  |       if (typeof cb === 'function') { | ||
|  |         process.nextTick(cb, new Error('the worker has exited')) | ||
|  |       } | ||
|  |       return | ||
|  |     } | ||
|  | 
 | ||
|  |     // TODO write all .buf
 | ||
|  |     const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX) | ||
|  |     // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
 | ||
|  |     wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { | ||
|  |       if (err) { | ||
|  |         destroy(this, err) | ||
|  |         process.nextTick(cb, err) | ||
|  |         return | ||
|  |       } | ||
|  |       if (res === 'not-equal') { | ||
|  |         // TODO handle deadlock
 | ||
|  |         this.flush(cb) | ||
|  |         return | ||
|  |       } | ||
|  |       process.nextTick(cb) | ||
|  |     }) | ||
|  |   } | ||
|  | 
 | ||
|  |   flushSync () { | ||
|  |     if (this[kImpl].destroyed) { | ||
|  |       return | ||
|  |     } | ||
|  | 
 | ||
|  |     writeSync(this) | ||
|  |     flushSync(this) | ||
|  |   } | ||
|  | 
 | ||
|  |   unref () { | ||
|  |     this.worker.unref() | ||
|  |   } | ||
|  | 
 | ||
|  |   ref () { | ||
|  |     this.worker.ref() | ||
|  |   } | ||
|  | 
 | ||
|  |   get ready () { | ||
|  |     return this[kImpl].ready | ||
|  |   } | ||
|  | 
 | ||
|  |   get destroyed () { | ||
|  |     return this[kImpl].destroyed | ||
|  |   } | ||
|  | 
 | ||
|  |   get closed () { | ||
|  |     return this[kImpl].closed | ||
|  |   } | ||
|  | 
 | ||
|  |   get writable () { | ||
|  |     return !this[kImpl].destroyed && !this[kImpl].ending | ||
|  |   } | ||
|  | 
 | ||
|  |   get writableEnded () { | ||
|  |     return this[kImpl].ending | ||
|  |   } | ||
|  | 
 | ||
|  |   get writableFinished () { | ||
|  |     return this[kImpl].finished | ||
|  |   } | ||
|  | 
 | ||
|  |   get writableNeedDrain () { | ||
|  |     return this[kImpl].needDrain | ||
|  |   } | ||
|  | 
 | ||
|  |   get writableObjectMode () { | ||
|  |     return false | ||
|  |   } | ||
|  | 
 | ||
|  |   get writableErrored () { | ||
|  |     return this[kImpl].errored | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function error (stream, err) { | ||
|  |   setImmediate(() => { | ||
|  |     stream.emit('error', err) | ||
|  |   }) | ||
|  | } | ||
|  | 
 | ||
|  | function destroy (stream, err) { | ||
|  |   if (stream[kImpl].destroyed) { | ||
|  |     return | ||
|  |   } | ||
|  |   stream[kImpl].destroyed = true | ||
|  | 
 | ||
|  |   if (err) { | ||
|  |     stream[kImpl].errored = err | ||
|  |     error(stream, err) | ||
|  |   } | ||
|  | 
 | ||
|  |   if (!stream.worker.exited) { | ||
|  |     stream.worker.terminate() | ||
|  |       .catch(() => {}) | ||
|  |       .then(() => { | ||
|  |         stream[kImpl].closed = true | ||
|  |         stream.emit('close') | ||
|  |       }) | ||
|  |   } else { | ||
|  |     setImmediate(() => { | ||
|  |       stream[kImpl].closed = true | ||
|  |       stream.emit('close') | ||
|  |     }) | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function write (stream, data, cb) { | ||
|  |   // data is smaller than the shared buffer length
 | ||
|  |   const current = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
|  |   const length = Buffer.byteLength(data) | ||
|  |   stream[kImpl].data.write(data, current) | ||
|  |   Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length) | ||
|  |   Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
|  |   cb() | ||
|  |   return true | ||
|  | } | ||
|  | 
 | ||
|  | function end (stream) { | ||
|  |   if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) { | ||
|  |     return | ||
|  |   } | ||
|  |   stream[kImpl].ended = true | ||
|  | 
 | ||
|  |   try { | ||
|  |     stream.flushSync() | ||
|  | 
 | ||
|  |     let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) | ||
|  | 
 | ||
|  |     // process._rawDebug('writing index')
 | ||
|  |     Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) | ||
|  |     // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
 | ||
|  |     Atomics.notify(stream[kImpl].state, WRITE_INDEX) | ||
|  | 
 | ||
|  |     // Wait for the process to complete
 | ||
|  |     let spins = 0 | ||
|  |     while (readIndex !== -1) { | ||
|  |       // process._rawDebug(`read = ${read}`)
 | ||
|  |       Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) | ||
|  |       readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) | ||
|  | 
 | ||
|  |       if (readIndex === -2) { | ||
|  |         destroy(stream, new Error('end() failed')) | ||
|  |         return | ||
|  |       } | ||
|  | 
 | ||
|  |       if (++spins === 10) { | ||
|  |         destroy(stream, new Error('end() took too long (10s)')) | ||
|  |         return | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     process.nextTick(() => { | ||
|  |       stream[kImpl].finished = true | ||
|  |       stream.emit('finish') | ||
|  |     }) | ||
|  |   } catch (err) { | ||
|  |     destroy(stream, err) | ||
|  |   } | ||
|  |   // process._rawDebug('end finished...')
 | ||
|  | } | ||
|  | 
 | ||
|  | function writeSync (stream) { | ||
|  |   const cb = () => { | ||
|  |     if (stream[kImpl].ending) { | ||
|  |       end(stream) | ||
|  |     } else if (stream[kImpl].needDrain) { | ||
|  |       process.nextTick(drain, stream) | ||
|  |     } | ||
|  |   } | ||
|  |   stream[kImpl].flushing = false | ||
|  | 
 | ||
|  |   while (stream[kImpl].buf.length !== 0) { | ||
|  |     const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
|  |     let leftover = stream[kImpl].data.length - writeIndex | ||
|  |     if (leftover === 0) { | ||
|  |       flushSync(stream) | ||
|  |       Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
|  |       Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
|  |       continue | ||
|  |     } else if (leftover < 0) { | ||
|  |       // stream should never happen
 | ||
|  |       throw new Error('overwritten') | ||
|  |     } | ||
|  | 
 | ||
|  |     let toWrite = stream[kImpl].buf.slice(0, leftover) | ||
|  |     let toWriteBytes = Buffer.byteLength(toWrite) | ||
|  |     if (toWriteBytes <= leftover) { | ||
|  |       stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
|  |       // process._rawDebug('writing ' + toWrite.length)
 | ||
|  |       write(stream, toWrite, cb) | ||
|  |     } else { | ||
|  |       // multi-byte utf-8
 | ||
|  |       flushSync(stream) | ||
|  |       Atomics.store(stream[kImpl].state, READ_INDEX, 0) | ||
|  |       Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) | ||
|  | 
 | ||
|  |       // Find a toWrite length that fits the buffer
 | ||
|  |       // it must exists as the buffer is at least 4 bytes length
 | ||
|  |       // and the max utf-8 length for a char is 4 bytes.
 | ||
|  |       while (toWriteBytes > stream[kImpl].buf.length) { | ||
|  |         leftover = leftover / 2 | ||
|  |         toWrite = stream[kImpl].buf.slice(0, leftover) | ||
|  |         toWriteBytes = Buffer.byteLength(toWrite) | ||
|  |       } | ||
|  |       stream[kImpl].buf = stream[kImpl].buf.slice(leftover) | ||
|  |       write(stream, toWrite, cb) | ||
|  |     } | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function flushSync (stream) { | ||
|  |   if (stream[kImpl].flushing) { | ||
|  |     throw new Error('unable to flush while flushing') | ||
|  |   } | ||
|  | 
 | ||
|  |   // process._rawDebug('flushSync started')
 | ||
|  | 
 | ||
|  |   const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) | ||
|  | 
 | ||
|  |   let spins = 0 | ||
|  | 
 | ||
|  |   // TODO handle deadlock
 | ||
|  |   while (true) { | ||
|  |     const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) | ||
|  | 
 | ||
|  |     if (readIndex === -2) { | ||
|  |       throw Error('_flushSync failed') | ||
|  |     } | ||
|  | 
 | ||
|  |     // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
 | ||
|  |     if (readIndex !== writeIndex) { | ||
|  |       // TODO stream timeouts for some reason.
 | ||
|  |       Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) | ||
|  |     } else { | ||
|  |       break | ||
|  |     } | ||
|  | 
 | ||
|  |     if (++spins === 10) { | ||
|  |       throw new Error('_flushSync took too long (10s)') | ||
|  |     } | ||
|  |   } | ||
|  |   // process._rawDebug('flushSync finished')
 | ||
|  | } | ||
|  | 
 | ||
|  | module.exports = ThreadStream |