You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					145 lines
				
				3.9 KiB
			
		
		
			
		
	
	
					145 lines
				
				3.9 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								var Stream = require('stream');
							 | 
						||
| 
								 | 
							
								var Promise = require('bluebird');
							 | 
						||
| 
								 | 
							
								var util = require('util');
							 | 
						||
| 
								 | 
							
								var Buffer = require('./Buffer');
							 | 
						||
| 
								 | 
							
								var strFunction = 'function';
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Backwards compatibility for node versions < 8
							 | 
						||
| 
								 | 
							
								if (!Stream.Writable || !Stream.Writable.prototype.destroy)
							 | 
						||
| 
								 | 
							
								  Stream = require('readable-stream');
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								function PullStream() {
							 | 
						||
| 
								 | 
							
								  if (!(this instanceof PullStream))
							 | 
						||
| 
								 | 
							
								    return new PullStream();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  Stream.Duplex.call(this,{decodeStrings:false, objectMode:true});
							 | 
						||
| 
								 | 
							
								  this.buffer = Buffer.from('');
							 | 
						||
| 
								 | 
							
								  var self = this;
							 | 
						||
| 
								 | 
							
								  self.on('finish',function() {
							 | 
						||
| 
								 | 
							
								    self.finished = true;
							 | 
						||
| 
								 | 
							
								    self.emit('chunk',false);
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								util.inherits(PullStream,Stream.Duplex);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								PullStream.prototype._write = function(chunk,e,cb) {
							 | 
						||
| 
								 | 
							
								  this.buffer = Buffer.concat([this.buffer,chunk]);
							 | 
						||
| 
								 | 
							
								  this.cb = cb;
							 | 
						||
| 
								 | 
							
								  this.emit('chunk');
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// The `eof` parameter is interpreted as `file_length` if the type is number
							 | 
						||
| 
								 | 
							
								// otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
							 | 
						||
| 
								 | 
							
								PullStream.prototype.stream = function(eof,includeEof) {
							 | 
						||
| 
								 | 
							
								  var p = Stream.PassThrough();
							 | 
						||
| 
								 | 
							
								  var done,self= this;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  function cb() {
							 | 
						||
| 
								 | 
							
								    if (typeof self.cb === strFunction) {
							 | 
						||
| 
								 | 
							
								      var callback = self.cb;
							 | 
						||
| 
								 | 
							
								      self.cb = undefined;
							 | 
						||
| 
								 | 
							
								      return callback();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  function pull() {
							 | 
						||
| 
								 | 
							
								    var packet;
							 | 
						||
| 
								 | 
							
								    if (self.buffer && self.buffer.length) {
							 | 
						||
| 
								 | 
							
								      if (typeof eof === 'number') {
							 | 
						||
| 
								 | 
							
								        packet = self.buffer.slice(0,eof);
							 | 
						||
| 
								 | 
							
								        self.buffer = self.buffer.slice(eof);
							 | 
						||
| 
								 | 
							
								        eof -= packet.length;
							 | 
						||
| 
								 | 
							
								        done = !eof;
							 | 
						||
| 
								 | 
							
								      } else {
							 | 
						||
| 
								 | 
							
								        var match = self.buffer.indexOf(eof);
							 | 
						||
| 
								 | 
							
								        if (match !== -1) {
							 | 
						||
| 
								 | 
							
								          // store signature match byte offset to allow us to reference
							 | 
						||
| 
								 | 
							
								          // this for zip64 offset
							 | 
						||
| 
								 | 
							
								          self.match = match
							 | 
						||
| 
								 | 
							
								          if (includeEof) match = match + eof.length;
							 | 
						||
| 
								 | 
							
								          packet = self.buffer.slice(0,match);
							 | 
						||
| 
								 | 
							
								          self.buffer = self.buffer.slice(match);
							 | 
						||
| 
								 | 
							
								          done = true;
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								          var len = self.buffer.length - eof.length;
							 | 
						||
| 
								 | 
							
								          if (len <= 0) {
							 | 
						||
| 
								 | 
							
								            cb();
							 | 
						||
| 
								 | 
							
								          } else {
							 | 
						||
| 
								 | 
							
								            packet = self.buffer.slice(0,len);
							 | 
						||
| 
								 | 
							
								            self.buffer = self.buffer.slice(len);
							 | 
						||
| 
								 | 
							
								          }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      if (packet) p.write(packet,function() {
							 | 
						||
| 
								 | 
							
								        if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
							 | 
						||
| 
								 | 
							
								      });
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    
							 | 
						||
| 
								 | 
							
								    if (!done) {
							 | 
						||
| 
								 | 
							
								      if (self.finished) {
							 | 
						||
| 
								 | 
							
								        self.removeListener('chunk',pull);
							 | 
						||
| 
								 | 
							
								        self.emit('error', new Error('FILE_ENDED'));
							 | 
						||
| 
								 | 
							
								        return;
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      self.removeListener('chunk',pull);
							 | 
						||
| 
								 | 
							
								      p.end();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  self.on('chunk',pull);
							 | 
						||
| 
								 | 
							
								  pull();
							 | 
						||
| 
								 | 
							
								  return p;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								PullStream.prototype.pull = function(eof,includeEof) {
							 | 
						||
| 
								 | 
							
								  if (eof === 0) return Promise.resolve('');
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // If we already have the required data in buffer
							 | 
						||
| 
								 | 
							
								  // we can resolve the request immediately
							 | 
						||
| 
								 | 
							
								  if (!isNaN(eof) && this.buffer.length > eof) {
							 | 
						||
| 
								 | 
							
								    var data = this.buffer.slice(0,eof);
							 | 
						||
| 
								 | 
							
								    this.buffer = this.buffer.slice(eof);
							 | 
						||
| 
								 | 
							
								    return Promise.resolve(data);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // Otherwise we stream until we have it
							 | 
						||
| 
								 | 
							
								  var buffer = Buffer.from(''),
							 | 
						||
| 
								 | 
							
								      self = this;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  var concatStream = Stream.Transform();
							 | 
						||
| 
								 | 
							
								  concatStream._transform = function(d,e,cb) {
							 | 
						||
| 
								 | 
							
								    buffer = Buffer.concat([buffer,d]);
							 | 
						||
| 
								 | 
							
								    cb();
							 | 
						||
| 
								 | 
							
								  };
							 | 
						||
| 
								 | 
							
								  
							 | 
						||
| 
								 | 
							
								  var rejectHandler;
							 | 
						||
| 
								 | 
							
								  var pullStreamRejectHandler;
							 | 
						||
| 
								 | 
							
								  return new Promise(function(resolve,reject) {
							 | 
						||
| 
								 | 
							
								    rejectHandler = reject;
							 | 
						||
| 
								 | 
							
								    pullStreamRejectHandler = function(e) {
							 | 
						||
| 
								 | 
							
								      self.__emittedError = e;
							 | 
						||
| 
								 | 
							
								      reject(e);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    if (self.finished)
							 | 
						||
| 
								 | 
							
								      return reject(new Error('FILE_ENDED'));
							 | 
						||
| 
								 | 
							
								    self.once('error',pullStreamRejectHandler);  // reject any errors from pullstream itself
							 | 
						||
| 
								 | 
							
								    self.stream(eof,includeEof)
							 | 
						||
| 
								 | 
							
								      .on('error',reject)
							 | 
						||
| 
								 | 
							
								      .pipe(concatStream)
							 | 
						||
| 
								 | 
							
								      .on('finish',function() {resolve(buffer);})
							 | 
						||
| 
								 | 
							
								      .on('error',reject);
							 | 
						||
| 
								 | 
							
								  })
							 | 
						||
| 
								 | 
							
								  .finally(function() {
							 | 
						||
| 
								 | 
							
								    self.removeListener('error',rejectHandler);
							 | 
						||
| 
								 | 
							
								    self.removeListener('error',pullStreamRejectHandler);
							 | 
						||
| 
								 | 
							
								  });
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								PullStream.prototype._read = function(){};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								module.exports = PullStream;
							 |