You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					209 lines
				
				4.6 KiB
			
		
		
			
		
	
	
					209 lines
				
				4.6 KiB
			| 
											3 years ago
										 | var util = require('util'); | ||
|  | var Stream = require('stream').Stream; | ||
|  | var DelayedStream = require('delayed-stream'); | ||
|  | 
 | ||
|  | module.exports = CombinedStream; | ||
|  | function CombinedStream() { | ||
|  |   this.writable = false; | ||
|  |   this.readable = true; | ||
|  |   this.dataSize = 0; | ||
|  |   this.maxDataSize = 2 * 1024 * 1024; | ||
|  |   this.pauseStreams = true; | ||
|  | 
 | ||
|  |   this._released = false; | ||
|  |   this._streams = []; | ||
|  |   this._currentStream = null; | ||
|  |   this._insideLoop = false; | ||
|  |   this._pendingNext = false; | ||
|  | } | ||
|  | util.inherits(CombinedStream, Stream); | ||
|  | 
 | ||
|  | CombinedStream.create = function(options) { | ||
|  |   var combinedStream = new this(); | ||
|  | 
 | ||
|  |   options = options || {}; | ||
|  |   for (var option in options) { | ||
|  |     combinedStream[option] = options[option]; | ||
|  |   } | ||
|  | 
 | ||
|  |   return combinedStream; | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.isStreamLike = function(stream) { | ||
|  |   return (typeof stream !== 'function') | ||
|  |     && (typeof stream !== 'string') | ||
|  |     && (typeof stream !== 'boolean') | ||
|  |     && (typeof stream !== 'number') | ||
|  |     && (!Buffer.isBuffer(stream)); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.append = function(stream) { | ||
|  |   var isStreamLike = CombinedStream.isStreamLike(stream); | ||
|  | 
 | ||
|  |   if (isStreamLike) { | ||
|  |     if (!(stream instanceof DelayedStream)) { | ||
|  |       var newStream = DelayedStream.create(stream, { | ||
|  |         maxDataSize: Infinity, | ||
|  |         pauseStream: this.pauseStreams, | ||
|  |       }); | ||
|  |       stream.on('data', this._checkDataSize.bind(this)); | ||
|  |       stream = newStream; | ||
|  |     } | ||
|  | 
 | ||
|  |     this._handleErrors(stream); | ||
|  | 
 | ||
|  |     if (this.pauseStreams) { | ||
|  |       stream.pause(); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   this._streams.push(stream); | ||
|  |   return this; | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.pipe = function(dest, options) { | ||
|  |   Stream.prototype.pipe.call(this, dest, options); | ||
|  |   this.resume(); | ||
|  |   return dest; | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._getNext = function() { | ||
|  |   this._currentStream = null; | ||
|  | 
 | ||
|  |   if (this._insideLoop) { | ||
|  |     this._pendingNext = true; | ||
|  |     return; // defer call
 | ||
|  |   } | ||
|  | 
 | ||
|  |   this._insideLoop = true; | ||
|  |   try { | ||
|  |     do { | ||
|  |       this._pendingNext = false; | ||
|  |       this._realGetNext(); | ||
|  |     } while (this._pendingNext); | ||
|  |   } finally { | ||
|  |     this._insideLoop = false; | ||
|  |   } | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._realGetNext = function() { | ||
|  |   var stream = this._streams.shift(); | ||
|  | 
 | ||
|  | 
 | ||
|  |   if (typeof stream == 'undefined') { | ||
|  |     this.end(); | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   if (typeof stream !== 'function') { | ||
|  |     this._pipeNext(stream); | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   var getStream = stream; | ||
|  |   getStream(function(stream) { | ||
|  |     var isStreamLike = CombinedStream.isStreamLike(stream); | ||
|  |     if (isStreamLike) { | ||
|  |       stream.on('data', this._checkDataSize.bind(this)); | ||
|  |       this._handleErrors(stream); | ||
|  |     } | ||
|  | 
 | ||
|  |     this._pipeNext(stream); | ||
|  |   }.bind(this)); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._pipeNext = function(stream) { | ||
|  |   this._currentStream = stream; | ||
|  | 
 | ||
|  |   var isStreamLike = CombinedStream.isStreamLike(stream); | ||
|  |   if (isStreamLike) { | ||
|  |     stream.on('end', this._getNext.bind(this)); | ||
|  |     stream.pipe(this, {end: false}); | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   var value = stream; | ||
|  |   this.write(value); | ||
|  |   this._getNext(); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._handleErrors = function(stream) { | ||
|  |   var self = this; | ||
|  |   stream.on('error', function(err) { | ||
|  |     self._emitError(err); | ||
|  |   }); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.write = function(data) { | ||
|  |   this.emit('data', data); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.pause = function() { | ||
|  |   if (!this.pauseStreams) { | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); | ||
|  |   this.emit('pause'); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.resume = function() { | ||
|  |   if (!this._released) { | ||
|  |     this._released = true; | ||
|  |     this.writable = true; | ||
|  |     this._getNext(); | ||
|  |   } | ||
|  | 
 | ||
|  |   if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); | ||
|  |   this.emit('resume'); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.end = function() { | ||
|  |   this._reset(); | ||
|  |   this.emit('end'); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype.destroy = function() { | ||
|  |   this._reset(); | ||
|  |   this.emit('close'); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._reset = function() { | ||
|  |   this.writable = false; | ||
|  |   this._streams = []; | ||
|  |   this._currentStream = null; | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._checkDataSize = function() { | ||
|  |   this._updateDataSize(); | ||
|  |   if (this.dataSize <= this.maxDataSize) { | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   var message = | ||
|  |     'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; | ||
|  |   this._emitError(new Error(message)); | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._updateDataSize = function() { | ||
|  |   this.dataSize = 0; | ||
|  | 
 | ||
|  |   var self = this; | ||
|  |   this._streams.forEach(function(stream) { | ||
|  |     if (!stream.dataSize) { | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     self.dataSize += stream.dataSize; | ||
|  |   }); | ||
|  | 
 | ||
|  |   if (this._currentStream && this._currentStream.dataSize) { | ||
|  |     this.dataSize += this._currentStream.dataSize; | ||
|  |   } | ||
|  | }; | ||
|  | 
 | ||
|  | CombinedStream.prototype._emitError = function(err) { | ||
|  |   this._reset(); | ||
|  |   this.emit('error', err); | ||
|  | }; |