You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							434 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							434 lines
						
					
					
						
							11 KiB
						
					
					
				| 'use strict';
 | |
| 
 | |
| var stream = require('stream'),
 | |
|   util = require('util');
 | |
| 
 | |
| module.exports = GridFSBucketReadStream;
 | |
| 
 | |
| /**
 | |
|  * A readable stream that enables you to read buffers from GridFS.
 | |
|  *
 | |
|  * Do not instantiate this class directly. Use `openDownloadStream()` instead.
 | |
|  *
 | |
|  * @class
 | |
|  * @extends external:Readable
 | |
|  * @param {Collection} chunks Handle for chunks collection
 | |
|  * @param {Collection} files Handle for files collection
 | |
|  * @param {Object} readPreference The read preference to use
 | |
|  * @param {Object} filter The query to use to find the file document
 | |
|  * @param {Object} [options] Optional settings.
 | |
|  * @param {Number} [options.sort] Optional sort for the file find query
 | |
|  * @param {Number} [options.skip] Optional skip for the file find query
 | |
|  * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
 | |
|  * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
 | |
|  * @fires GridFSBucketReadStream#error
 | |
|  * @fires GridFSBucketReadStream#file
 | |
|  */
 | |
| function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
 | |
|   this.s = {
 | |
|     bytesRead: 0,
 | |
|     chunks: chunks,
 | |
|     cursor: null,
 | |
|     expected: 0,
 | |
|     files: files,
 | |
|     filter: filter,
 | |
|     init: false,
 | |
|     expectedEnd: 0,
 | |
|     file: null,
 | |
|     options: options,
 | |
|     readPreference: readPreference
 | |
|   };
 | |
| 
 | |
|   stream.Readable.call(this);
 | |
| }
 | |
| 
 | |
| util.inherits(GridFSBucketReadStream, stream.Readable);
 | |
| 
 | |
| /**
 | |
|  * An error occurred
 | |
|  *
 | |
|  * @event GridFSBucketReadStream#error
 | |
|  * @type {Error}
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * Fires when the stream loaded the file document corresponding to the
 | |
|  * provided id.
 | |
|  *
 | |
|  * @event GridFSBucketReadStream#file
 | |
|  * @type {object}
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * Emitted when a chunk of data is available to be consumed.
 | |
|  *
 | |
|  * @event GridFSBucketReadStream#data
 | |
|  * @type {object}
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * Fired when the stream is exhausted (no more data events).
 | |
|  *
 | |
|  * @event GridFSBucketReadStream#end
 | |
|  * @type {object}
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * Fired when the stream is exhausted and the underlying cursor is killed
 | |
|  *
 | |
|  * @event GridFSBucketReadStream#close
 | |
|  * @type {object}
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * Reads from the cursor and pushes to the stream.
 | |
|  * Private Impl, do not call directly
 | |
|  * @ignore
 | |
|  * @method
 | |
|  */
 | |
| 
 | |
| GridFSBucketReadStream.prototype._read = function() {
 | |
|   var _this = this;
 | |
|   if (this.destroyed) {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   waitForFile(_this, function() {
 | |
|     doRead(_this);
 | |
|   });
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Sets the 0-based offset in bytes to start streaming from. Throws
 | |
|  * an error if this stream has entered flowing mode
 | |
|  * (e.g. if you've already called `on('data')`)
 | |
|  * @method
 | |
|  * @param {Number} start Offset in bytes to start reading at
 | |
|  * @return {GridFSBucketReadStream} Reference to Self
 | |
|  */
 | |
| 
 | |
| GridFSBucketReadStream.prototype.start = function(start) {
 | |
|   throwIfInitialized(this);
 | |
|   this.s.options.start = start;
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Sets the 0-based offset in bytes to start streaming from. Throws
 | |
|  * an error if this stream has entered flowing mode
 | |
|  * (e.g. if you've already called `on('data')`)
 | |
|  * @method
 | |
|  * @param {Number} end Offset in bytes to stop reading at
 | |
|  * @return {GridFSBucketReadStream} Reference to self
 | |
|  */
 | |
| 
 | |
| GridFSBucketReadStream.prototype.end = function(end) {
 | |
|   throwIfInitialized(this);
 | |
|   this.s.options.end = end;
 | |
|   return this;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Marks this stream as aborted (will never push another `data` event)
 | |
|  * and kills the underlying cursor. Will emit the 'end' event, and then
 | |
|  * the 'close' event once the cursor is successfully killed.
 | |
|  *
 | |
|  * @method
 | |
|  * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
 | |
|  * @fires GridFSBucketWriteStream#close
 | |
|  * @fires GridFSBucketWriteStream#end
 | |
|  */
 | |
| 
 | |
| GridFSBucketReadStream.prototype.abort = function(callback) {
 | |
|   var _this = this;
 | |
|   this.push(null);
 | |
|   this.destroyed = true;
 | |
|   if (this.s.cursor) {
 | |
|     this.s.cursor.close(function(error) {
 | |
|       _this.emit('close');
 | |
|       callback && callback(error);
 | |
|     });
 | |
|   } else {
 | |
|     if (!this.s.init) {
 | |
|       // If not initialized, fire close event because we will never
 | |
|       // get a cursor
 | |
|       _this.emit('close');
 | |
|     }
 | |
|     callback && callback();
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function throwIfInitialized(self) {
 | |
|   if (self.s.init) {
 | |
|     throw new Error('You cannot change options after the stream has entered' + 'flowing mode!');
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function doRead(_this) {
 | |
|   if (_this.destroyed) {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   _this.s.cursor.next(function(error, doc) {
 | |
|     if (_this.destroyed) {
 | |
|       return;
 | |
|     }
 | |
|     if (error) {
 | |
|       return __handleError(_this, error);
 | |
|     }
 | |
|     if (!doc) {
 | |
|       _this.push(null);
 | |
| 
 | |
|       process.nextTick(() => {
 | |
|         _this.s.cursor.close(function(error) {
 | |
|           if (error) {
 | |
|             __handleError(_this, error);
 | |
|             return;
 | |
|           }
 | |
| 
 | |
|           _this.emit('close');
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
 | |
|     var expectedN = _this.s.expected++;
 | |
|     var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining);
 | |
| 
 | |
|     if (doc.n > expectedN) {
 | |
|       var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
 | |
|       return __handleError(_this, new Error(errmsg));
 | |
|     }
 | |
| 
 | |
|     if (doc.n < expectedN) {
 | |
|       errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
 | |
|       return __handleError(_this, new Error(errmsg));
 | |
|     }
 | |
| 
 | |
|     var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
 | |
| 
 | |
|     if (buf.length !== expectedLength) {
 | |
|       if (bytesRemaining <= 0) {
 | |
|         errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
 | |
|         return __handleError(_this, new Error(errmsg));
 | |
|       }
 | |
| 
 | |
|       errmsg =
 | |
|         'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength;
 | |
|       return __handleError(_this, new Error(errmsg));
 | |
|     }
 | |
| 
 | |
|     _this.s.bytesRead += buf.length;
 | |
| 
 | |
|     if (buf.length === 0) {
 | |
|       return _this.push(null);
 | |
|     }
 | |
| 
 | |
|     var sliceStart = null;
 | |
|     var sliceEnd = null;
 | |
| 
 | |
|     if (_this.s.bytesToSkip != null) {
 | |
|       sliceStart = _this.s.bytesToSkip;
 | |
|       _this.s.bytesToSkip = 0;
 | |
|     }
 | |
| 
 | |
|     const atEndOfStream = expectedN === _this.s.expectedEnd - 1;
 | |
|     const bytesLeftToRead = _this.s.options.end - _this.s.bytesToSkip;
 | |
|     if (atEndOfStream && _this.s.bytesToTrim != null) {
 | |
|       sliceEnd = _this.s.file.chunkSize - _this.s.bytesToTrim;
 | |
|     } else if (_this.s.options.end && bytesLeftToRead < doc.data.length()) {
 | |
|       sliceEnd = bytesLeftToRead;
 | |
|     }
 | |
| 
 | |
|     if (sliceStart != null || sliceEnd != null) {
 | |
|       buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
 | |
|     }
 | |
| 
 | |
|     _this.push(buf);
 | |
|   });
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function init(self) {
 | |
|   var findOneOptions = {};
 | |
|   if (self.s.readPreference) {
 | |
|     findOneOptions.readPreference = self.s.readPreference;
 | |
|   }
 | |
|   if (self.s.options && self.s.options.sort) {
 | |
|     findOneOptions.sort = self.s.options.sort;
 | |
|   }
 | |
|   if (self.s.options && self.s.options.skip) {
 | |
|     findOneOptions.skip = self.s.options.skip;
 | |
|   }
 | |
| 
 | |
|   self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
 | |
|     if (error) {
 | |
|       return __handleError(self, error);
 | |
|     }
 | |
| 
 | |
|     if (!doc) {
 | |
|       var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename;
 | |
|       var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
 | |
|       var err = new Error(errmsg);
 | |
|       err.code = 'ENOENT';
 | |
|       return __handleError(self, err);
 | |
|     }
 | |
| 
 | |
|     // If document is empty, kill the stream immediately and don't
 | |
|     // execute any reads
 | |
|     if (doc.length <= 0) {
 | |
|       self.push(null);
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     if (self.destroyed) {
 | |
|       // If user destroys the stream before we have a cursor, wait
 | |
|       // until the query is done to say we're 'closed' because we can't
 | |
|       // cancel a query.
 | |
|       self.emit('close');
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     try {
 | |
|       self.s.bytesToSkip = handleStartOption(self, doc, self.s.options);
 | |
|     } catch (error) {
 | |
|       return __handleError(self, error);
 | |
|     }
 | |
| 
 | |
|     var filter = { files_id: doc._id };
 | |
| 
 | |
|     // Currently (MongoDB 3.4.4) skip function does not support the index,
 | |
|     // it needs to retrieve all the documents first and then skip them. (CS-25811)
 | |
|     // As work around we use $gte on the "n" field.
 | |
|     if (self.s.options && self.s.options.start != null) {
 | |
|       var skip = Math.floor(self.s.options.start / doc.chunkSize);
 | |
|       if (skip > 0) {
 | |
|         filter['n'] = { $gte: skip };
 | |
|       }
 | |
|     }
 | |
|     self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 });
 | |
| 
 | |
|     if (self.s.readPreference) {
 | |
|       self.s.cursor.setReadPreference(self.s.readPreference);
 | |
|     }
 | |
| 
 | |
|     self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
 | |
|     self.s.file = doc;
 | |
| 
 | |
|     try {
 | |
|       self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options);
 | |
|     } catch (error) {
 | |
|       return __handleError(self, error);
 | |
|     }
 | |
| 
 | |
|     self.emit('file', doc);
 | |
|   });
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function waitForFile(_this, callback) {
 | |
|   if (_this.s.file) {
 | |
|     return callback();
 | |
|   }
 | |
| 
 | |
|   if (!_this.s.init) {
 | |
|     init(_this);
 | |
|     _this.s.init = true;
 | |
|   }
 | |
| 
 | |
|   _this.once('file', function() {
 | |
|     callback();
 | |
|   });
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function handleStartOption(stream, doc, options) {
 | |
|   if (options && options.start != null) {
 | |
|     if (options.start > doc.length) {
 | |
|       throw new Error(
 | |
|         'Stream start (' +
 | |
|           options.start +
 | |
|           ') must not be ' +
 | |
|           'more than the length of the file (' +
 | |
|           doc.length +
 | |
|           ')'
 | |
|       );
 | |
|     }
 | |
|     if (options.start < 0) {
 | |
|       throw new Error('Stream start (' + options.start + ') must not be ' + 'negative');
 | |
|     }
 | |
|     if (options.end != null && options.end < options.start) {
 | |
|       throw new Error(
 | |
|         'Stream start (' +
 | |
|           options.start +
 | |
|           ') must not be ' +
 | |
|           'greater than stream end (' +
 | |
|           options.end +
 | |
|           ')'
 | |
|       );
 | |
|     }
 | |
| 
 | |
|     stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
 | |
|     stream.s.expected = Math.floor(options.start / doc.chunkSize);
 | |
| 
 | |
|     return options.start - stream.s.bytesRead;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function handleEndOption(stream, doc, cursor, options) {
 | |
|   if (options && options.end != null) {
 | |
|     if (options.end > doc.length) {
 | |
|       throw new Error(
 | |
|         'Stream end (' +
 | |
|           options.end +
 | |
|           ') must not be ' +
 | |
|           'more than the length of the file (' +
 | |
|           doc.length +
 | |
|           ')'
 | |
|       );
 | |
|     }
 | |
|     if (options.start < 0) {
 | |
|       throw new Error('Stream end (' + options.end + ') must not be ' + 'negative');
 | |
|     }
 | |
| 
 | |
|     var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
 | |
| 
 | |
|     cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
 | |
| 
 | |
|     stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
 | |
| 
 | |
|     return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @ignore
 | |
|  */
 | |
| 
 | |
| function __handleError(_this, error) {
 | |
|   _this.emit('error', error);
 | |
| }
 |