You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
519 lines
14 KiB
519 lines
14 KiB
"use strict";
|
|
|
|
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }
|
|
|
|
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }
|
|
|
|
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
|
|
|
|
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }
|
|
|
|
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }
|
|
|
|
/* eslint-disable max-classes-per-file */
|
|
var Stream = require('readable-stream');
|
|
|
|
var utils = require('./utils');
|
|
|
|
var StringBuf = require('./string-buf'); // =============================================================================
|
|
// data chunks - encapsulating incoming data
|
|
|
|
|
|
var StringChunk = /*#__PURE__*/function () {
|
|
function StringChunk(data, encoding) {
|
|
_classCallCheck(this, StringChunk);
|
|
|
|
this._data = data;
|
|
this._encoding = encoding;
|
|
}
|
|
|
|
_createClass(StringChunk, [{
|
|
key: "copy",
|
|
// copy to target buffer
|
|
value: function copy(target, targetOffset, offset, length) {
|
|
return this.toBuffer().copy(target, targetOffset, offset, length);
|
|
}
|
|
}, {
|
|
key: "toBuffer",
|
|
value: function toBuffer() {
|
|
if (!this._buffer) {
|
|
this._buffer = Buffer.from(this._data, this._encoding);
|
|
}
|
|
|
|
return this._buffer;
|
|
}
|
|
}, {
|
|
key: "length",
|
|
get: function get() {
|
|
return this.toBuffer().length;
|
|
}
|
|
}]);
|
|
|
|
return StringChunk;
|
|
}();
|
|
|
|
var StringBufChunk = /*#__PURE__*/function () {
|
|
function StringBufChunk(data) {
|
|
_classCallCheck(this, StringBufChunk);
|
|
|
|
this._data = data;
|
|
}
|
|
|
|
_createClass(StringBufChunk, [{
|
|
key: "copy",
|
|
// copy to target buffer
|
|
value: function copy(target, targetOffset, offset, length) {
|
|
// eslint-disable-next-line no-underscore-dangle
|
|
return this._data._buf.copy(target, targetOffset, offset, length);
|
|
}
|
|
}, {
|
|
key: "toBuffer",
|
|
value: function toBuffer() {
|
|
return this._data.toBuffer();
|
|
}
|
|
}, {
|
|
key: "length",
|
|
get: function get() {
|
|
return this._data.length;
|
|
}
|
|
}]);
|
|
|
|
return StringBufChunk;
|
|
}();
|
|
|
|
var BufferChunk = /*#__PURE__*/function () {
|
|
function BufferChunk(data) {
|
|
_classCallCheck(this, BufferChunk);
|
|
|
|
this._data = data;
|
|
}
|
|
|
|
_createClass(BufferChunk, [{
|
|
key: "copy",
|
|
// copy to target buffer
|
|
value: function copy(target, targetOffset, offset, length) {
|
|
this._data.copy(target, targetOffset, offset, length);
|
|
}
|
|
}, {
|
|
key: "toBuffer",
|
|
value: function toBuffer() {
|
|
return this._data;
|
|
}
|
|
}, {
|
|
key: "length",
|
|
get: function get() {
|
|
return this._data.length;
|
|
}
|
|
}]);
|
|
|
|
return BufferChunk;
|
|
}(); // =============================================================================
|
|
// ReadWriteBuf - a single buffer supporting simple read-write
|
|
|
|
|
|
var ReadWriteBuf = /*#__PURE__*/function () {
|
|
function ReadWriteBuf(size) {
|
|
_classCallCheck(this, ReadWriteBuf);
|
|
|
|
this.size = size; // the buffer
|
|
|
|
this.buffer = Buffer.alloc(size); // read index
|
|
|
|
this.iRead = 0; // write index
|
|
|
|
this.iWrite = 0;
|
|
}
|
|
|
|
_createClass(ReadWriteBuf, [{
|
|
key: "toBuffer",
|
|
value: function toBuffer() {
|
|
if (this.iRead === 0 && this.iWrite === this.size) {
|
|
return this.buffer;
|
|
}
|
|
|
|
var buf = Buffer.alloc(this.iWrite - this.iRead);
|
|
this.buffer.copy(buf, 0, this.iRead, this.iWrite);
|
|
return buf;
|
|
}
|
|
}, {
|
|
key: "read",
|
|
value: function read(size) {
|
|
var buf; // read size bytes from buffer and return buffer
|
|
|
|
if (size === 0) {
|
|
// special case - return null if no data requested
|
|
return null;
|
|
}
|
|
|
|
if (size === undefined || size >= this.length) {
|
|
// if no size specified or size is at least what we have then return all of the bytes
|
|
buf = this.toBuffer();
|
|
this.iRead = this.iWrite;
|
|
return buf;
|
|
} // otherwise return a chunk
|
|
|
|
|
|
buf = Buffer.alloc(size);
|
|
this.buffer.copy(buf, 0, this.iRead, size);
|
|
this.iRead += size;
|
|
return buf;
|
|
}
|
|
}, {
|
|
key: "write",
|
|
value: function write(chunk, offset, length) {
|
|
// write as many bytes from data from optional source offset
|
|
// and return number of bytes written
|
|
var size = Math.min(length, this.size - this.iWrite);
|
|
chunk.copy(this.buffer, this.iWrite, offset, offset + size);
|
|
this.iWrite += size;
|
|
return size;
|
|
}
|
|
}, {
|
|
key: "length",
|
|
get: function get() {
|
|
return this.iWrite - this.iRead;
|
|
}
|
|
}, {
|
|
key: "eod",
|
|
get: function get() {
|
|
return this.iRead === this.iWrite;
|
|
}
|
|
}, {
|
|
key: "full",
|
|
get: function get() {
|
|
return this.iWrite === this.size;
|
|
}
|
|
}]);
|
|
|
|
return ReadWriteBuf;
|
|
}(); // =============================================================================
|
|
// StreamBuf - a multi-purpose read-write stream
|
|
// As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
|
|
// As StreamHub - pipe to multiple writables
|
|
// As readable stream - feed data into the writable part and have some other code read from it.
|
|
// Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
|
|
// integration tests
|
|
|
|
|
|
var StreamBuf = function StreamBuf(options) {
|
|
options = options || {};
|
|
this.bufSize = options.bufSize || 1024 * 1024;
|
|
this.buffers = []; // batch mode fills a buffer completely before passing the data on
|
|
// to pipes or 'readable' event listeners
|
|
|
|
this.batch = options.batch || false;
|
|
this.corked = false; // where in the current writable buffer we're up to
|
|
|
|
this.inPos = 0; // where in the current readable buffer we've read up to
|
|
|
|
this.outPos = 0; // consuming pipe streams go here
|
|
|
|
this.pipes = []; // controls emit('data')
|
|
|
|
this.paused = false;
|
|
this.encoding = null;
|
|
};
|
|
|
|
utils.inherits(StreamBuf, Stream.Duplex, {
|
|
toBuffer: function toBuffer() {
|
|
switch (this.buffers.length) {
|
|
case 0:
|
|
return null;
|
|
|
|
case 1:
|
|
return this.buffers[0].toBuffer();
|
|
|
|
default:
|
|
return Buffer.concat(this.buffers.map(function (rwBuf) {
|
|
return rwBuf.toBuffer();
|
|
}));
|
|
}
|
|
},
|
|
// writable
|
|
// event drain - if write returns false (which it won't), indicates when safe to write again.
|
|
// finish - end() has been called
|
|
// pipe(src) - pipe() has been called on readable
|
|
// unpipe(src) - unpipe() has been called on readable
|
|
// error - duh
|
|
_getWritableBuffer: function _getWritableBuffer() {
|
|
if (this.buffers.length) {
|
|
var last = this.buffers[this.buffers.length - 1];
|
|
|
|
if (!last.full) {
|
|
return last;
|
|
}
|
|
}
|
|
|
|
var buf = new ReadWriteBuf(this.bufSize);
|
|
this.buffers.push(buf);
|
|
return buf;
|
|
},
|
|
_pipe: function _pipe(chunk) {
|
|
var _this = this;
|
|
|
|
return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee() {
|
|
var write;
|
|
return regeneratorRuntime.wrap(function _callee$(_context) {
|
|
while (1) {
|
|
switch (_context.prev = _context.next) {
|
|
case 0:
|
|
write = function write(pipe) {
|
|
return new Promise(function (resolve) {
|
|
pipe.write(chunk.toBuffer(), function () {
|
|
resolve();
|
|
});
|
|
});
|
|
};
|
|
|
|
_context.next = 3;
|
|
return Promise.all(_this.pipes.map(write));
|
|
|
|
case 3:
|
|
case "end":
|
|
return _context.stop();
|
|
}
|
|
}
|
|
}, _callee);
|
|
}))();
|
|
},
|
|
_writeToBuffers: function _writeToBuffers(chunk) {
|
|
var inPos = 0;
|
|
var inLen = chunk.length;
|
|
|
|
while (inPos < inLen) {
|
|
// find writable buffer
|
|
var buffer = this._getWritableBuffer(); // write some data
|
|
|
|
|
|
inPos += buffer.write(chunk, inPos, inLen - inPos);
|
|
}
|
|
},
|
|
write: function write(data, encoding, callback) {
|
|
var _this2 = this;
|
|
|
|
return _asyncToGenerator( /*#__PURE__*/regeneratorRuntime.mark(function _callee2() {
|
|
var chunk;
|
|
return regeneratorRuntime.wrap(function _callee2$(_context2) {
|
|
while (1) {
|
|
switch (_context2.prev = _context2.next) {
|
|
case 0:
|
|
if (encoding instanceof Function) {
|
|
callback = encoding;
|
|
encoding = 'utf8';
|
|
}
|
|
|
|
callback = callback || utils.nop; // encapsulate data into a chunk
|
|
|
|
if (!(data instanceof StringBuf)) {
|
|
_context2.next = 6;
|
|
break;
|
|
}
|
|
|
|
chunk = new StringBufChunk(data);
|
|
_context2.next = 15;
|
|
break;
|
|
|
|
case 6:
|
|
if (!(data instanceof Buffer)) {
|
|
_context2.next = 10;
|
|
break;
|
|
}
|
|
|
|
chunk = new BufferChunk(data);
|
|
_context2.next = 15;
|
|
break;
|
|
|
|
case 10:
|
|
if (!(typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer)) {
|
|
_context2.next = 14;
|
|
break;
|
|
}
|
|
|
|
chunk = new StringChunk(data, encoding);
|
|
_context2.next = 15;
|
|
break;
|
|
|
|
case 14:
|
|
throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
|
|
|
|
case 15:
|
|
if (!_this2.pipes.length) {
|
|
_context2.next = 31;
|
|
break;
|
|
}
|
|
|
|
if (!_this2.batch) {
|
|
_context2.next = 21;
|
|
break;
|
|
}
|
|
|
|
_this2._writeToBuffers(chunk);
|
|
|
|
while (!_this2.corked && _this2.buffers.length > 1) {
|
|
_this2._pipe(_this2.buffers.shift());
|
|
}
|
|
|
|
_context2.next = 29;
|
|
break;
|
|
|
|
case 21:
|
|
if (_this2.corked) {
|
|
_context2.next = 27;
|
|
break;
|
|
}
|
|
|
|
_context2.next = 24;
|
|
return _this2._pipe(chunk);
|
|
|
|
case 24:
|
|
callback();
|
|
_context2.next = 29;
|
|
break;
|
|
|
|
case 27:
|
|
_this2._writeToBuffers(chunk);
|
|
|
|
process.nextTick(callback);
|
|
|
|
case 29:
|
|
_context2.next = 34;
|
|
break;
|
|
|
|
case 31:
|
|
if (!_this2.paused) {
|
|
_this2.emit('data', chunk.toBuffer());
|
|
}
|
|
|
|
_this2._writeToBuffers(chunk);
|
|
|
|
_this2.emit('readable');
|
|
|
|
case 34:
|
|
return _context2.abrupt("return", true);
|
|
|
|
case 35:
|
|
case "end":
|
|
return _context2.stop();
|
|
}
|
|
}
|
|
}, _callee2);
|
|
}))();
|
|
},
|
|
cork: function cork() {
|
|
this.corked = true;
|
|
},
|
|
_flush: function _flush()
|
|
/* destination */
|
|
{
|
|
// if we have comsumers...
|
|
if (this.pipes.length) {
|
|
// and there's stuff not written
|
|
while (this.buffers.length) {
|
|
this._pipe(this.buffers.shift());
|
|
}
|
|
}
|
|
},
|
|
uncork: function uncork() {
|
|
this.corked = false;
|
|
|
|
this._flush();
|
|
},
|
|
end: function end(chunk, encoding, callback) {
|
|
var _this3 = this;
|
|
|
|
var writeComplete = function writeComplete(error) {
|
|
if (error) {
|
|
callback(error);
|
|
} else {
|
|
_this3._flush();
|
|
|
|
_this3.pipes.forEach(function (pipe) {
|
|
pipe.end();
|
|
});
|
|
|
|
_this3.emit('finish');
|
|
}
|
|
};
|
|
|
|
if (chunk) {
|
|
this.write(chunk, encoding, writeComplete);
|
|
} else {
|
|
writeComplete();
|
|
}
|
|
},
|
|
// readable
|
|
// event readable - some data is now available
|
|
// event data - switch to flowing mode - feeds chunks to handler
|
|
// event end - no more data
|
|
// event close - optional, indicates upstream close
|
|
// event error - duh
|
|
read: function read(size) {
|
|
var buffers; // read min(buffer, size || infinity)
|
|
|
|
if (size) {
|
|
buffers = [];
|
|
|
|
while (size && this.buffers.length && !this.buffers[0].eod) {
|
|
var first = this.buffers[0];
|
|
var buffer = first.read(size);
|
|
size -= buffer.length;
|
|
buffers.push(buffer);
|
|
|
|
if (first.eod && first.full) {
|
|
this.buffers.shift();
|
|
}
|
|
}
|
|
|
|
return Buffer.concat(buffers);
|
|
}
|
|
|
|
buffers = this.buffers.map(function (buf) {
|
|
return buf.toBuffer();
|
|
}).filter(Boolean);
|
|
this.buffers = [];
|
|
return Buffer.concat(buffers);
|
|
},
|
|
setEncoding: function setEncoding(encoding) {
|
|
// causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
|
|
this.encoding = encoding;
|
|
},
|
|
pause: function pause() {
|
|
this.paused = true;
|
|
},
|
|
resume: function resume() {
|
|
this.paused = false;
|
|
},
|
|
isPaused: function isPaused() {
|
|
return !!this.paused;
|
|
},
|
|
pipe: function pipe(destination) {
|
|
// add destination to pipe list & write current buffer
|
|
this.pipes.push(destination);
|
|
|
|
if (!this.paused && this.buffers.length) {
|
|
this.end();
|
|
}
|
|
},
|
|
unpipe: function unpipe(destination) {
|
|
// remove destination from pipe list
|
|
this.pipes = this.pipes.filter(function (pipe) {
|
|
return pipe !== destination;
|
|
});
|
|
},
|
|
unshift: function unshift()
|
|
/* chunk */
|
|
{
|
|
// some numpty has read some data that's not for them and they want to put it back!
|
|
// Might implement this some day
|
|
throw new Error('Not Implemented');
|
|
},
|
|
wrap: function wrap()
|
|
/* stream */
|
|
{
|
|
// not implemented
|
|
throw new Error('Not Implemented');
|
|
}
|
|
});
|
|
module.exports = StreamBuf;
|
|
//# sourceMappingURL=stream-buf.js.map
|