You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					434 lines
				
				11 KiB
			
		
		
			
		
	
	
					434 lines
				
				11 KiB
			| 
											3 years ago
										 | 'use strict'; | ||
|  | 
 | ||
|  | var stream = require('stream'), | ||
|  |   util = require('util'); | ||
|  | 
 | ||
|  | module.exports = GridFSBucketReadStream; | ||
|  | 
 | ||
|  | /** | ||
|  |  * A readable stream that enables you to read buffers from GridFS. | ||
|  |  * | ||
|  |  * Do not instantiate this class directly. Use `openDownloadStream()` instead. | ||
|  |  * | ||
|  |  * @class | ||
|  |  * @extends external:Readable | ||
|  |  * @param {Collection} chunks Handle for chunks collection | ||
|  |  * @param {Collection} files Handle for files collection | ||
|  |  * @param {Object} readPreference The read preference to use | ||
|  |  * @param {Object} filter The query to use to find the file document | ||
|  |  * @param {Object} [options] Optional settings. | ||
|  |  * @param {Number} [options.sort] Optional sort for the file find query | ||
|  |  * @param {Number} [options.skip] Optional skip for the file find query | ||
|  |  * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from | ||
|  |  * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before | ||
|  |  * @fires GridFSBucketReadStream#error | ||
|  |  * @fires GridFSBucketReadStream#file | ||
|  |  */ | ||
|  | function GridFSBucketReadStream(chunks, files, readPreference, filter, options) { | ||
|  |   this.s = { | ||
|  |     bytesRead: 0, | ||
|  |     chunks: chunks, | ||
|  |     cursor: null, | ||
|  |     expected: 0, | ||
|  |     files: files, | ||
|  |     filter: filter, | ||
|  |     init: false, | ||
|  |     expectedEnd: 0, | ||
|  |     file: null, | ||
|  |     options: options, | ||
|  |     readPreference: readPreference | ||
|  |   }; | ||
|  | 
 | ||
|  |   stream.Readable.call(this); | ||
|  | } | ||
|  | 
 | ||
|  | util.inherits(GridFSBucketReadStream, stream.Readable); | ||
|  | 
 | ||
|  | /** | ||
|  |  * An error occurred | ||
|  |  * | ||
|  |  * @event GridFSBucketReadStream#error | ||
|  |  * @type {Error} | ||
|  |  */ | ||
|  | 
 | ||
|  | /** | ||
|  |  * Fires when the stream loaded the file document corresponding to the | ||
|  |  * provided id. | ||
|  |  * | ||
|  |  * @event GridFSBucketReadStream#file | ||
|  |  * @type {object} | ||
|  |  */ | ||
|  | 
 | ||
|  | /** | ||
|  |  * Emitted when a chunk of data is available to be consumed. | ||
|  |  * | ||
|  |  * @event GridFSBucketReadStream#data | ||
|  |  * @type {object} | ||
|  |  */ | ||
|  | 
 | ||
|  | /** | ||
|  |  * Fired when the stream is exhausted (no more data events). | ||
|  |  * | ||
|  |  * @event GridFSBucketReadStream#end | ||
|  |  * @type {object} | ||
|  |  */ | ||
|  | 
 | ||
|  | /** | ||
|  |  * Fired when the stream is exhausted and the underlying cursor is killed | ||
|  |  * | ||
|  |  * @event GridFSBucketReadStream#close | ||
|  |  * @type {object} | ||
|  |  */ | ||
|  | 
 | ||
|  | /** | ||
|  |  * Reads from the cursor and pushes to the stream. | ||
|  |  * Private Impl, do not call directly | ||
|  |  * @ignore | ||
|  |  * @method | ||
|  |  */ | ||
|  | 
 | ||
|  | GridFSBucketReadStream.prototype._read = function() { | ||
|  |   var _this = this; | ||
|  |   if (this.destroyed) { | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   waitForFile(_this, function() { | ||
|  |     doRead(_this); | ||
|  |   }); | ||
|  | }; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Sets the 0-based offset in bytes to start streaming from. Throws | ||
|  |  * an error if this stream has entered flowing mode | ||
|  |  * (e.g. if you've already called `on('data')`)
 | ||
|  |  * @method | ||
|  |  * @param {Number} start Offset in bytes to start reading at | ||
|  |  * @return {GridFSBucketReadStream} Reference to Self | ||
|  |  */ | ||
|  | 
 | ||
|  | GridFSBucketReadStream.prototype.start = function(start) { | ||
|  |   throwIfInitialized(this); | ||
|  |   this.s.options.start = start; | ||
|  |   return this; | ||
|  | }; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Sets the 0-based offset in bytes to start streaming from. Throws | ||
|  |  * an error if this stream has entered flowing mode | ||
|  |  * (e.g. if you've already called `on('data')`)
 | ||
|  |  * @method | ||
|  |  * @param {Number} end Offset in bytes to stop reading at | ||
|  |  * @return {GridFSBucketReadStream} Reference to self | ||
|  |  */ | ||
|  | 
 | ||
|  | GridFSBucketReadStream.prototype.end = function(end) { | ||
|  |   throwIfInitialized(this); | ||
|  |   this.s.options.end = end; | ||
|  |   return this; | ||
|  | }; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Marks this stream as aborted (will never push another `data` event) | ||
|  |  * and kills the underlying cursor. Will emit the 'end' event, and then | ||
|  |  * the 'close' event once the cursor is successfully killed. | ||
|  |  * | ||
|  |  * @method | ||
|  |  * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred. | ||
|  |  * @fires GridFSBucketWriteStream#close | ||
|  |  * @fires GridFSBucketWriteStream#end | ||
|  |  */ | ||
|  | 
 | ||
|  | GridFSBucketReadStream.prototype.abort = function(callback) { | ||
|  |   var _this = this; | ||
|  |   this.push(null); | ||
|  |   this.destroyed = true; | ||
|  |   if (this.s.cursor) { | ||
|  |     this.s.cursor.close(function(error) { | ||
|  |       _this.emit('close'); | ||
|  |       callback && callback(error); | ||
|  |     }); | ||
|  |   } else { | ||
|  |     if (!this.s.init) { | ||
|  |       // If not initialized, fire close event because we will never
 | ||
|  |       // get a cursor
 | ||
|  |       _this.emit('close'); | ||
|  |     } | ||
|  |     callback && callback(); | ||
|  |   } | ||
|  | }; | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function throwIfInitialized(self) { | ||
|  |   if (self.s.init) { | ||
|  |     throw new Error('You cannot change options after the stream has entered' + 'flowing mode!'); | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function doRead(_this) { | ||
|  |   if (_this.destroyed) { | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   _this.s.cursor.next(function(error, doc) { | ||
|  |     if (_this.destroyed) { | ||
|  |       return; | ||
|  |     } | ||
|  |     if (error) { | ||
|  |       return __handleError(_this, error); | ||
|  |     } | ||
|  |     if (!doc) { | ||
|  |       _this.push(null); | ||
|  | 
 | ||
|  |       process.nextTick(() => { | ||
|  |         _this.s.cursor.close(function(error) { | ||
|  |           if (error) { | ||
|  |             __handleError(_this, error); | ||
|  |             return; | ||
|  |           } | ||
|  | 
 | ||
|  |           _this.emit('close'); | ||
|  |         }); | ||
|  |       }); | ||
|  | 
 | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     var bytesRemaining = _this.s.file.length - _this.s.bytesRead; | ||
|  |     var expectedN = _this.s.expected++; | ||
|  |     var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining); | ||
|  | 
 | ||
|  |     if (doc.n > expectedN) { | ||
|  |       var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN; | ||
|  |       return __handleError(_this, new Error(errmsg)); | ||
|  |     } | ||
|  | 
 | ||
|  |     if (doc.n < expectedN) { | ||
|  |       errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN; | ||
|  |       return __handleError(_this, new Error(errmsg)); | ||
|  |     } | ||
|  | 
 | ||
|  |     var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer; | ||
|  | 
 | ||
|  |     if (buf.length !== expectedLength) { | ||
|  |       if (bytesRemaining <= 0) { | ||
|  |         errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n; | ||
|  |         return __handleError(_this, new Error(errmsg)); | ||
|  |       } | ||
|  | 
 | ||
|  |       errmsg = | ||
|  |         'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength; | ||
|  |       return __handleError(_this, new Error(errmsg)); | ||
|  |     } | ||
|  | 
 | ||
|  |     _this.s.bytesRead += buf.length; | ||
|  | 
 | ||
|  |     if (buf.length === 0) { | ||
|  |       return _this.push(null); | ||
|  |     } | ||
|  | 
 | ||
|  |     var sliceStart = null; | ||
|  |     var sliceEnd = null; | ||
|  | 
 | ||
|  |     if (_this.s.bytesToSkip != null) { | ||
|  |       sliceStart = _this.s.bytesToSkip; | ||
|  |       _this.s.bytesToSkip = 0; | ||
|  |     } | ||
|  | 
 | ||
|  |     const atEndOfStream = expectedN === _this.s.expectedEnd - 1; | ||
|  |     const bytesLeftToRead = _this.s.options.end - _this.s.bytesToSkip; | ||
|  |     if (atEndOfStream && _this.s.bytesToTrim != null) { | ||
|  |       sliceEnd = _this.s.file.chunkSize - _this.s.bytesToTrim; | ||
|  |     } else if (_this.s.options.end && bytesLeftToRead < doc.data.length()) { | ||
|  |       sliceEnd = bytesLeftToRead; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (sliceStart != null || sliceEnd != null) { | ||
|  |       buf = buf.slice(sliceStart || 0, sliceEnd || buf.length); | ||
|  |     } | ||
|  | 
 | ||
|  |     _this.push(buf); | ||
|  |   }); | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function init(self) { | ||
|  |   var findOneOptions = {}; | ||
|  |   if (self.s.readPreference) { | ||
|  |     findOneOptions.readPreference = self.s.readPreference; | ||
|  |   } | ||
|  |   if (self.s.options && self.s.options.sort) { | ||
|  |     findOneOptions.sort = self.s.options.sort; | ||
|  |   } | ||
|  |   if (self.s.options && self.s.options.skip) { | ||
|  |     findOneOptions.skip = self.s.options.skip; | ||
|  |   } | ||
|  | 
 | ||
|  |   self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) { | ||
|  |     if (error) { | ||
|  |       return __handleError(self, error); | ||
|  |     } | ||
|  | 
 | ||
|  |     if (!doc) { | ||
|  |       var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename; | ||
|  |       var errmsg = 'FileNotFound: file ' + identifier + ' was not found'; | ||
|  |       var err = new Error(errmsg); | ||
|  |       err.code = 'ENOENT'; | ||
|  |       return __handleError(self, err); | ||
|  |     } | ||
|  | 
 | ||
|  |     // If document is empty, kill the stream immediately and don't
 | ||
|  |     // execute any reads
 | ||
|  |     if (doc.length <= 0) { | ||
|  |       self.push(null); | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (self.destroyed) { | ||
|  |       // If user destroys the stream before we have a cursor, wait
 | ||
|  |       // until the query is done to say we're 'closed' because we can't
 | ||
|  |       // cancel a query.
 | ||
|  |       self.emit('close'); | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     try { | ||
|  |       self.s.bytesToSkip = handleStartOption(self, doc, self.s.options); | ||
|  |     } catch (error) { | ||
|  |       return __handleError(self, error); | ||
|  |     } | ||
|  | 
 | ||
|  |     var filter = { files_id: doc._id }; | ||
|  | 
 | ||
|  |     // Currently (MongoDB 3.4.4) skip function does not support the index,
 | ||
|  |     // it needs to retrieve all the documents first and then skip them. (CS-25811)
 | ||
|  |     // As work around we use $gte on the "n" field.
 | ||
|  |     if (self.s.options && self.s.options.start != null) { | ||
|  |       var skip = Math.floor(self.s.options.start / doc.chunkSize); | ||
|  |       if (skip > 0) { | ||
|  |         filter['n'] = { $gte: skip }; | ||
|  |       } | ||
|  |     } | ||
|  |     self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 }); | ||
|  | 
 | ||
|  |     if (self.s.readPreference) { | ||
|  |       self.s.cursor.setReadPreference(self.s.readPreference); | ||
|  |     } | ||
|  | 
 | ||
|  |     self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize); | ||
|  |     self.s.file = doc; | ||
|  | 
 | ||
|  |     try { | ||
|  |       self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options); | ||
|  |     } catch (error) { | ||
|  |       return __handleError(self, error); | ||
|  |     } | ||
|  | 
 | ||
|  |     self.emit('file', doc); | ||
|  |   }); | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function waitForFile(_this, callback) { | ||
|  |   if (_this.s.file) { | ||
|  |     return callback(); | ||
|  |   } | ||
|  | 
 | ||
|  |   if (!_this.s.init) { | ||
|  |     init(_this); | ||
|  |     _this.s.init = true; | ||
|  |   } | ||
|  | 
 | ||
|  |   _this.once('file', function() { | ||
|  |     callback(); | ||
|  |   }); | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function handleStartOption(stream, doc, options) { | ||
|  |   if (options && options.start != null) { | ||
|  |     if (options.start > doc.length) { | ||
|  |       throw new Error( | ||
|  |         'Stream start (' + | ||
|  |           options.start + | ||
|  |           ') must not be ' + | ||
|  |           'more than the length of the file (' + | ||
|  |           doc.length + | ||
|  |           ')' | ||
|  |       ); | ||
|  |     } | ||
|  |     if (options.start < 0) { | ||
|  |       throw new Error('Stream start (' + options.start + ') must not be ' + 'negative'); | ||
|  |     } | ||
|  |     if (options.end != null && options.end < options.start) { | ||
|  |       throw new Error( | ||
|  |         'Stream start (' + | ||
|  |           options.start + | ||
|  |           ') must not be ' + | ||
|  |           'greater than stream end (' + | ||
|  |           options.end + | ||
|  |           ')' | ||
|  |       ); | ||
|  |     } | ||
|  | 
 | ||
|  |     stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize; | ||
|  |     stream.s.expected = Math.floor(options.start / doc.chunkSize); | ||
|  | 
 | ||
|  |     return options.start - stream.s.bytesRead; | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function handleEndOption(stream, doc, cursor, options) { | ||
|  |   if (options && options.end != null) { | ||
|  |     if (options.end > doc.length) { | ||
|  |       throw new Error( | ||
|  |         'Stream end (' + | ||
|  |           options.end + | ||
|  |           ') must not be ' + | ||
|  |           'more than the length of the file (' + | ||
|  |           doc.length + | ||
|  |           ')' | ||
|  |       ); | ||
|  |     } | ||
|  |     if (options.start < 0) { | ||
|  |       throw new Error('Stream end (' + options.end + ') must not be ' + 'negative'); | ||
|  |     } | ||
|  | 
 | ||
|  |     var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0; | ||
|  | 
 | ||
|  |     cursor.limit(Math.ceil(options.end / doc.chunkSize) - start); | ||
|  | 
 | ||
|  |     stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize); | ||
|  | 
 | ||
|  |     return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end; | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | 
 | ||
|  | function __handleError(_this, error) { | ||
|  |   _this.emit('error', error); | ||
|  | } |