You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
528 lines
13 KiB
528 lines
13 KiB
'use strict'
|
|
|
|
const { version } = require('./package.json')
|
|
const { EventEmitter } = require('events')
|
|
const { Worker } = require('worker_threads')
|
|
const { join } = require('path')
|
|
const { pathToFileURL } = require('url')
|
|
const { wait } = require('./lib/wait')
|
|
const {
|
|
WRITE_INDEX,
|
|
READ_INDEX
|
|
} = require('./lib/indexes')
|
|
const buffer = require('buffer')
|
|
const assert = require('assert')
|
|
|
|
const kImpl = Symbol('kImpl')
|
|
|
|
// V8 limit for string size
|
|
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
|
|
|
|
class FakeWeakRef {
|
|
constructor (value) {
|
|
this._value = value
|
|
}
|
|
|
|
deref () {
|
|
return this._value
|
|
}
|
|
}
|
|
|
|
const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry {
|
|
register () {}
|
|
|
|
unregister () {}
|
|
}
|
|
|
|
const WeakRef = global.WeakRef || FakeWeakRef
|
|
|
|
const registry = new FinalizationRegistry((worker) => {
|
|
if (worker.exited) {
|
|
return
|
|
}
|
|
worker.terminate()
|
|
})
|
|
|
|
function createWorker (stream, opts) {
|
|
const { filename, workerData } = opts
|
|
|
|
const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {}
|
|
const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js')
|
|
|
|
const worker = new Worker(toExecute, {
|
|
...opts.workerOpts,
|
|
workerData: {
|
|
filename: filename.indexOf('file://') === 0
|
|
? filename
|
|
: pathToFileURL(filename).href,
|
|
dataBuf: stream[kImpl].dataBuf,
|
|
stateBuf: stream[kImpl].stateBuf,
|
|
workerData: {
|
|
$context: {
|
|
threadStreamVersion: version
|
|
},
|
|
...workerData
|
|
}
|
|
}
|
|
})
|
|
|
|
// We keep a strong reference for now,
|
|
// we need to start writing first
|
|
worker.stream = new FakeWeakRef(stream)
|
|
|
|
worker.on('message', onWorkerMessage)
|
|
worker.on('exit', onWorkerExit)
|
|
registry.register(stream, worker)
|
|
|
|
return worker
|
|
}
|
|
|
|
function drain (stream) {
|
|
assert(!stream[kImpl].sync)
|
|
if (stream[kImpl].needDrain) {
|
|
stream[kImpl].needDrain = false
|
|
stream.emit('drain')
|
|
}
|
|
}
|
|
|
|
function nextFlush (stream) {
|
|
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
|
|
let leftover = stream[kImpl].data.length - writeIndex
|
|
|
|
if (leftover > 0) {
|
|
if (stream[kImpl].buf.length === 0) {
|
|
stream[kImpl].flushing = false
|
|
|
|
if (stream[kImpl].ending) {
|
|
end(stream)
|
|
} else if (stream[kImpl].needDrain) {
|
|
process.nextTick(drain, stream)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
let toWrite = stream[kImpl].buf.slice(0, leftover)
|
|
let toWriteBytes = Buffer.byteLength(toWrite)
|
|
if (toWriteBytes <= leftover) {
|
|
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
|
|
// process._rawDebug('writing ' + toWrite.length)
|
|
write(stream, toWrite, nextFlush.bind(null, stream))
|
|
} else {
|
|
// multi-byte utf-8
|
|
stream.flush(() => {
|
|
// err is already handled in flush()
|
|
if (stream.destroyed) {
|
|
return
|
|
}
|
|
|
|
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
|
|
|
|
// Find a toWrite length that fits the buffer
|
|
// it must exists as the buffer is at least 4 bytes length
|
|
// and the max utf-8 length for a char is 4 bytes.
|
|
while (toWriteBytes > stream[kImpl].data.length) {
|
|
leftover = leftover / 2
|
|
toWrite = stream[kImpl].buf.slice(0, leftover)
|
|
toWriteBytes = Buffer.byteLength(toWrite)
|
|
}
|
|
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
|
|
write(stream, toWrite, nextFlush.bind(null, stream))
|
|
})
|
|
}
|
|
} else if (leftover === 0) {
|
|
if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
|
|
// we had a flushSync in the meanwhile
|
|
return
|
|
}
|
|
stream.flush(() => {
|
|
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
|
|
nextFlush(stream)
|
|
})
|
|
} else {
|
|
// This should never happen
|
|
destroy(stream, new Error('overwritten'))
|
|
}
|
|
}
|
|
|
|
function onWorkerMessage (msg) {
|
|
const stream = this.stream.deref()
|
|
if (stream === undefined) {
|
|
this.exited = true
|
|
// Terminate the worker.
|
|
this.terminate()
|
|
return
|
|
}
|
|
|
|
switch (msg.code) {
|
|
case 'READY':
|
|
// Replace the FakeWeakRef with a
|
|
// proper one.
|
|
this.stream = new WeakRef(stream)
|
|
|
|
stream.flush(() => {
|
|
stream[kImpl].ready = true
|
|
stream.emit('ready')
|
|
})
|
|
break
|
|
case 'ERROR':
|
|
destroy(stream, msg.err)
|
|
break
|
|
case 'EVENT':
|
|
if (Array.isArray(msg.args)) {
|
|
stream.emit(msg.name, ...msg.args)
|
|
} else {
|
|
stream.emit(msg.name, msg.args)
|
|
}
|
|
break
|
|
default:
|
|
destroy(stream, new Error('this should not happen: ' + msg.code))
|
|
}
|
|
}
|
|
|
|
function onWorkerExit (code) {
|
|
const stream = this.stream.deref()
|
|
if (stream === undefined) {
|
|
// Nothing to do, the worker already exit
|
|
return
|
|
}
|
|
registry.unregister(stream)
|
|
stream.worker.exited = true
|
|
stream.worker.off('exit', onWorkerExit)
|
|
destroy(stream, code !== 0 ? new Error('the worker thread exited') : null)
|
|
}
|
|
|
|
class ThreadStream extends EventEmitter {
|
|
constructor (opts = {}) {
|
|
super()
|
|
|
|
if (opts.bufferSize < 4) {
|
|
throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
|
|
}
|
|
|
|
this[kImpl] = {}
|
|
this[kImpl].stateBuf = new SharedArrayBuffer(128)
|
|
this[kImpl].state = new Int32Array(this[kImpl].stateBuf)
|
|
this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024)
|
|
this[kImpl].data = Buffer.from(this[kImpl].dataBuf)
|
|
this[kImpl].sync = opts.sync || false
|
|
this[kImpl].ending = false
|
|
this[kImpl].ended = false
|
|
this[kImpl].needDrain = false
|
|
this[kImpl].destroyed = false
|
|
this[kImpl].flushing = false
|
|
this[kImpl].ready = false
|
|
this[kImpl].finished = false
|
|
this[kImpl].errored = null
|
|
this[kImpl].closed = false
|
|
this[kImpl].buf = ''
|
|
|
|
// TODO (fix): Make private?
|
|
this.worker = createWorker(this, opts) // TODO (fix): make private
|
|
}
|
|
|
|
write (data) {
|
|
if (this[kImpl].destroyed) {
|
|
error(this, new Error('the worker has exited'))
|
|
return false
|
|
}
|
|
|
|
if (this[kImpl].ending) {
|
|
error(this, new Error('the worker is ending'))
|
|
return false
|
|
}
|
|
|
|
if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
|
|
try {
|
|
writeSync(this)
|
|
this[kImpl].flushing = true
|
|
} catch (err) {
|
|
destroy(this, err)
|
|
return false
|
|
}
|
|
}
|
|
|
|
this[kImpl].buf += data
|
|
|
|
if (this[kImpl].sync) {
|
|
try {
|
|
writeSync(this)
|
|
return true
|
|
} catch (err) {
|
|
destroy(this, err)
|
|
return false
|
|
}
|
|
}
|
|
|
|
if (!this[kImpl].flushing) {
|
|
this[kImpl].flushing = true
|
|
setImmediate(nextFlush, this)
|
|
}
|
|
|
|
this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
|
|
return !this[kImpl].needDrain
|
|
}
|
|
|
|
end () {
|
|
if (this[kImpl].destroyed) {
|
|
return
|
|
}
|
|
|
|
this[kImpl].ending = true
|
|
end(this)
|
|
}
|
|
|
|
flush (cb) {
|
|
if (this[kImpl].destroyed) {
|
|
if (typeof cb === 'function') {
|
|
process.nextTick(cb, new Error('the worker has exited'))
|
|
}
|
|
return
|
|
}
|
|
|
|
// TODO write all .buf
|
|
const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
|
|
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
|
|
wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
|
|
if (err) {
|
|
destroy(this, err)
|
|
process.nextTick(cb, err)
|
|
return
|
|
}
|
|
if (res === 'not-equal') {
|
|
// TODO handle deadlock
|
|
this.flush(cb)
|
|
return
|
|
}
|
|
process.nextTick(cb)
|
|
})
|
|
}
|
|
|
|
flushSync () {
|
|
if (this[kImpl].destroyed) {
|
|
return
|
|
}
|
|
|
|
writeSync(this)
|
|
flushSync(this)
|
|
}
|
|
|
|
unref () {
|
|
this.worker.unref()
|
|
}
|
|
|
|
ref () {
|
|
this.worker.ref()
|
|
}
|
|
|
|
get ready () {
|
|
return this[kImpl].ready
|
|
}
|
|
|
|
get destroyed () {
|
|
return this[kImpl].destroyed
|
|
}
|
|
|
|
get closed () {
|
|
return this[kImpl].closed
|
|
}
|
|
|
|
get writable () {
|
|
return !this[kImpl].destroyed && !this[kImpl].ending
|
|
}
|
|
|
|
get writableEnded () {
|
|
return this[kImpl].ending
|
|
}
|
|
|
|
get writableFinished () {
|
|
return this[kImpl].finished
|
|
}
|
|
|
|
get writableNeedDrain () {
|
|
return this[kImpl].needDrain
|
|
}
|
|
|
|
get writableObjectMode () {
|
|
return false
|
|
}
|
|
|
|
get writableErrored () {
|
|
return this[kImpl].errored
|
|
}
|
|
}
|
|
|
|
function error (stream, err) {
|
|
setImmediate(() => {
|
|
stream.emit('error', err)
|
|
})
|
|
}
|
|
|
|
function destroy (stream, err) {
|
|
if (stream[kImpl].destroyed) {
|
|
return
|
|
}
|
|
stream[kImpl].destroyed = true
|
|
|
|
if (err) {
|
|
stream[kImpl].errored = err
|
|
error(stream, err)
|
|
}
|
|
|
|
if (!stream.worker.exited) {
|
|
stream.worker.terminate()
|
|
.catch(() => {})
|
|
.then(() => {
|
|
stream[kImpl].closed = true
|
|
stream.emit('close')
|
|
})
|
|
} else {
|
|
setImmediate(() => {
|
|
stream[kImpl].closed = true
|
|
stream.emit('close')
|
|
})
|
|
}
|
|
}
|
|
|
|
function write (stream, data, cb) {
|
|
// data is smaller than the shared buffer length
|
|
const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
|
|
const length = Buffer.byteLength(data)
|
|
stream[kImpl].data.write(data, current)
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
|
|
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
|
|
cb()
|
|
return true
|
|
}
|
|
|
|
function end (stream) {
|
|
if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) {
|
|
return
|
|
}
|
|
stream[kImpl].ended = true
|
|
|
|
try {
|
|
stream.flushSync()
|
|
|
|
let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
|
|
|
|
// process._rawDebug('writing index')
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
|
|
// process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
|
|
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
|
|
|
|
// Wait for the process to complete
|
|
let spins = 0
|
|
while (readIndex !== -1) {
|
|
// process._rawDebug(`read = ${read}`)
|
|
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
|
|
readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
|
|
|
|
if (readIndex === -2) {
|
|
destroy(stream, new Error('end() failed'))
|
|
return
|
|
}
|
|
|
|
if (++spins === 10) {
|
|
destroy(stream, new Error('end() took too long (10s)'))
|
|
return
|
|
}
|
|
}
|
|
|
|
process.nextTick(() => {
|
|
stream[kImpl].finished = true
|
|
stream.emit('finish')
|
|
})
|
|
} catch (err) {
|
|
destroy(stream, err)
|
|
}
|
|
// process._rawDebug('end finished...')
|
|
}
|
|
|
|
function writeSync (stream) {
|
|
const cb = () => {
|
|
if (stream[kImpl].ending) {
|
|
end(stream)
|
|
} else if (stream[kImpl].needDrain) {
|
|
process.nextTick(drain, stream)
|
|
}
|
|
}
|
|
stream[kImpl].flushing = false
|
|
|
|
while (stream[kImpl].buf.length !== 0) {
|
|
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
|
|
let leftover = stream[kImpl].data.length - writeIndex
|
|
if (leftover === 0) {
|
|
flushSync(stream)
|
|
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
|
|
continue
|
|
} else if (leftover < 0) {
|
|
// stream should never happen
|
|
throw new Error('overwritten')
|
|
}
|
|
|
|
let toWrite = stream[kImpl].buf.slice(0, leftover)
|
|
let toWriteBytes = Buffer.byteLength(toWrite)
|
|
if (toWriteBytes <= leftover) {
|
|
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
|
|
// process._rawDebug('writing ' + toWrite.length)
|
|
write(stream, toWrite, cb)
|
|
} else {
|
|
// multi-byte utf-8
|
|
flushSync(stream)
|
|
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
|
|
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
|
|
|
|
// Find a toWrite length that fits the buffer
|
|
// it must exists as the buffer is at least 4 bytes length
|
|
// and the max utf-8 length for a char is 4 bytes.
|
|
while (toWriteBytes > stream[kImpl].buf.length) {
|
|
leftover = leftover / 2
|
|
toWrite = stream[kImpl].buf.slice(0, leftover)
|
|
toWriteBytes = Buffer.byteLength(toWrite)
|
|
}
|
|
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
|
|
write(stream, toWrite, cb)
|
|
}
|
|
}
|
|
}
|
|
|
|
function flushSync (stream) {
|
|
if (stream[kImpl].flushing) {
|
|
throw new Error('unable to flush while flushing')
|
|
}
|
|
|
|
// process._rawDebug('flushSync started')
|
|
|
|
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
|
|
|
|
let spins = 0
|
|
|
|
// TODO handle deadlock
|
|
while (true) {
|
|
const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
|
|
|
|
if (readIndex === -2) {
|
|
throw Error('_flushSync failed')
|
|
}
|
|
|
|
// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
|
|
if (readIndex !== writeIndex) {
|
|
// TODO stream timeouts for some reason.
|
|
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
|
|
} else {
|
|
break
|
|
}
|
|
|
|
if (++spins === 10) {
|
|
throw new Error('_flushSync took too long (10s)')
|
|
}
|
|
}
|
|
// process._rawDebug('flushSync finished')
|
|
}
|
|
|
|
module.exports = ThreadStream
|