You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					145 lines
				
				3.9 KiB
			
		
		
			
		
	
	
					145 lines
				
				3.9 KiB
			| 
											2 years ago
										 | var Stream = require('stream'); | ||
|  | var Promise = require('bluebird'); | ||
|  | var util = require('util'); | ||
|  | var Buffer = require('./Buffer'); | ||
|  | var strFunction = 'function'; | ||
|  | 
 | ||
|  | // Backwards compatibility for node versions < 8
 | ||
|  | if (!Stream.Writable || !Stream.Writable.prototype.destroy) | ||
|  |   Stream = require('readable-stream'); | ||
|  | 
 | ||
|  | function PullStream() { | ||
|  |   if (!(this instanceof PullStream)) | ||
|  |     return new PullStream(); | ||
|  | 
 | ||
|  |   Stream.Duplex.call(this,{decodeStrings:false, objectMode:true}); | ||
|  |   this.buffer = Buffer.from(''); | ||
|  |   var self = this; | ||
|  |   self.on('finish',function() { | ||
|  |     self.finished = true; | ||
|  |     self.emit('chunk',false); | ||
|  |   }); | ||
|  | } | ||
|  | 
 | ||
|  | util.inherits(PullStream,Stream.Duplex); | ||
|  | 
 | ||
|  | PullStream.prototype._write = function(chunk,e,cb) { | ||
|  |   this.buffer = Buffer.concat([this.buffer,chunk]); | ||
|  |   this.cb = cb; | ||
|  |   this.emit('chunk'); | ||
|  | }; | ||
|  | 
 | ||
|  | 
 | ||
|  | // The `eof` parameter is interpreted as `file_length` if the type is number
 | ||
|  | // otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
 | ||
|  | PullStream.prototype.stream = function(eof,includeEof) { | ||
|  |   var p = Stream.PassThrough(); | ||
|  |   var done,self= this; | ||
|  | 
 | ||
|  |   function cb() { | ||
|  |     if (typeof self.cb === strFunction) { | ||
|  |       var callback = self.cb; | ||
|  |       self.cb = undefined; | ||
|  |       return callback(); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function pull() { | ||
|  |     var packet; | ||
|  |     if (self.buffer && self.buffer.length) { | ||
|  |       if (typeof eof === 'number') { | ||
|  |         packet = self.buffer.slice(0,eof); | ||
|  |         self.buffer = self.buffer.slice(eof); | ||
|  |         eof -= packet.length; | ||
|  |         done = !eof; | ||
|  |       } else { | ||
|  |         var match = self.buffer.indexOf(eof); | ||
|  |         if (match !== -1) { | ||
|  |           // store signature match byte offset to allow us to reference
 | ||
|  |           // this for zip64 offset
 | ||
|  |           self.match = match | ||
|  |           if (includeEof) match = match + eof.length; | ||
|  |           packet = self.buffer.slice(0,match); | ||
|  |           self.buffer = self.buffer.slice(match); | ||
|  |           done = true; | ||
|  |         } else { | ||
|  |           var len = self.buffer.length - eof.length; | ||
|  |           if (len <= 0) { | ||
|  |             cb(); | ||
|  |           } else { | ||
|  |             packet = self.buffer.slice(0,len); | ||
|  |             self.buffer = self.buffer.slice(len); | ||
|  |           } | ||
|  |         } | ||
|  |       } | ||
|  |       if (packet) p.write(packet,function() { | ||
|  |         if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb(); | ||
|  |       }); | ||
|  |     } | ||
|  |      | ||
|  |     if (!done) { | ||
|  |       if (self.finished) { | ||
|  |         self.removeListener('chunk',pull); | ||
|  |         self.emit('error', new Error('FILE_ENDED')); | ||
|  |         return; | ||
|  |       } | ||
|  |        | ||
|  |     } else { | ||
|  |       self.removeListener('chunk',pull); | ||
|  |       p.end(); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   self.on('chunk',pull); | ||
|  |   pull(); | ||
|  |   return p; | ||
|  | }; | ||
|  | 
 | ||
|  | PullStream.prototype.pull = function(eof,includeEof) { | ||
|  |   if (eof === 0) return Promise.resolve(''); | ||
|  | 
 | ||
|  |   // If we already have the required data in buffer
 | ||
|  |   // we can resolve the request immediately
 | ||
|  |   if (!isNaN(eof) && this.buffer.length > eof) { | ||
|  |     var data = this.buffer.slice(0,eof); | ||
|  |     this.buffer = this.buffer.slice(eof); | ||
|  |     return Promise.resolve(data); | ||
|  |   } | ||
|  | 
 | ||
|  |   // Otherwise we stream until we have it
 | ||
|  |   var buffer = Buffer.from(''), | ||
|  |       self = this; | ||
|  | 
 | ||
|  |   var concatStream = Stream.Transform(); | ||
|  |   concatStream._transform = function(d,e,cb) { | ||
|  |     buffer = Buffer.concat([buffer,d]); | ||
|  |     cb(); | ||
|  |   }; | ||
|  |    | ||
|  |   var rejectHandler; | ||
|  |   var pullStreamRejectHandler; | ||
|  |   return new Promise(function(resolve,reject) { | ||
|  |     rejectHandler = reject; | ||
|  |     pullStreamRejectHandler = function(e) { | ||
|  |       self.__emittedError = e; | ||
|  |       reject(e); | ||
|  |     } | ||
|  |     if (self.finished) | ||
|  |       return reject(new Error('FILE_ENDED')); | ||
|  |     self.once('error',pullStreamRejectHandler);  // reject any errors from pullstream itself
 | ||
|  |     self.stream(eof,includeEof) | ||
|  |       .on('error',reject) | ||
|  |       .pipe(concatStream) | ||
|  |       .on('finish',function() {resolve(buffer);}) | ||
|  |       .on('error',reject); | ||
|  |   }) | ||
|  |   .finally(function() { | ||
|  |     self.removeListener('error',rejectHandler); | ||
|  |     self.removeListener('error',pullStreamRejectHandler); | ||
|  |   }); | ||
|  | }; | ||
|  | 
 | ||
|  | PullStream.prototype._read = function(){}; | ||
|  | 
 | ||
|  | module.exports = PullStream; |