You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							313 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							313 lines
						
					
					
						
							12 KiB
						
					
					
				| "use strict";
 | |
| Object.defineProperty(exports, "__esModule", { value: true });
 | |
| exports.GridFSBucketReadStream = void 0;
 | |
| const stream_1 = require("stream");
 | |
| const error_1 = require("../error");
 | |
| /**
 | |
|  * A readable stream that enables you to read buffers from GridFS.
 | |
|  *
 | |
|  * Do not instantiate this class directly. Use `openDownloadStream()` instead.
 | |
|  * @public
 | |
|  */
 | |
| class GridFSBucketReadStream extends stream_1.Readable {
 | |
|     /**
 | |
|      * @param chunks - Handle for chunks collection
 | |
|      * @param files - Handle for files collection
 | |
|      * @param readPreference - The read preference to use
 | |
|      * @param filter - The filter to use to find the file document
 | |
|      * @internal
 | |
|      */
 | |
|     constructor(chunks, files, readPreference, filter, options) {
 | |
|         super();
 | |
|         this.s = {
 | |
|             bytesToTrim: 0,
 | |
|             bytesToSkip: 0,
 | |
|             bytesRead: 0,
 | |
|             chunks,
 | |
|             expected: 0,
 | |
|             files,
 | |
|             filter,
 | |
|             init: false,
 | |
|             expectedEnd: 0,
 | |
|             options: {
 | |
|                 start: 0,
 | |
|                 end: 0,
 | |
|                 ...options
 | |
|             },
 | |
|             readPreference
 | |
|         };
 | |
|     }
 | |
|     /**
 | |
|      * Reads from the cursor and pushes to the stream.
 | |
|      * Private Impl, do not call directly
 | |
|      * @internal
 | |
|      */
 | |
|     _read() {
 | |
|         if (this.destroyed)
 | |
|             return;
 | |
|         waitForFile(this, () => doRead(this));
 | |
|     }
 | |
|     /**
 | |
|      * Sets the 0-based offset in bytes to start streaming from. Throws
 | |
|      * an error if this stream has entered flowing mode
 | |
|      * (e.g. if you've already called `on('data')`)
 | |
|      *
 | |
|      * @param start - 0-based offset in bytes to start streaming from
 | |
|      */
 | |
|     start(start = 0) {
 | |
|         throwIfInitialized(this);
 | |
|         this.s.options.start = start;
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Sets the 0-based offset in bytes to start streaming from. Throws
 | |
|      * an error if this stream has entered flowing mode
 | |
|      * (e.g. if you've already called `on('data')`)
 | |
|      *
 | |
|      * @param end - Offset in bytes to stop reading at
 | |
|      */
 | |
|     end(end = 0) {
 | |
|         throwIfInitialized(this);
 | |
|         this.s.options.end = end;
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Marks this stream as aborted (will never push another `data` event)
 | |
|      * and kills the underlying cursor. Will emit the 'end' event, and then
 | |
|      * the 'close' event once the cursor is successfully killed.
 | |
|      */
 | |
|     async abort() {
 | |
|         this.push(null);
 | |
|         this.destroyed = true;
 | |
|         if (this.s.cursor) {
 | |
|             try {
 | |
|                 await this.s.cursor.close();
 | |
|             }
 | |
|             finally {
 | |
|                 this.emit(GridFSBucketReadStream.CLOSE);
 | |
|             }
 | |
|         }
 | |
|         else {
 | |
|             if (!this.s.init) {
 | |
|                 // If not initialized, fire close event because we will never
 | |
|                 // get a cursor
 | |
|                 this.emit(GridFSBucketReadStream.CLOSE);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| exports.GridFSBucketReadStream = GridFSBucketReadStream;
 | |
| /**
 | |
|  * An error occurred
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketReadStream.ERROR = 'error';
 | |
| /**
 | |
|  * Fires when the stream loaded the file document corresponding to the provided id.
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketReadStream.FILE = 'file';
 | |
| /**
 | |
|  * Emitted when a chunk of data is available to be consumed.
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketReadStream.DATA = 'data';
 | |
| /**
 | |
|  * Fired when the stream is exhausted (no more data events).
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketReadStream.END = 'end';
 | |
| /**
 | |
|  * Fired when the stream is exhausted and the underlying cursor is killed
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketReadStream.CLOSE = 'close';
 | |
| function throwIfInitialized(stream) {
 | |
|     if (stream.s.init) {
 | |
|         throw new error_1.MongoGridFSStreamError('Options cannot be changed after the stream is initialized');
 | |
|     }
 | |
| }
 | |
| function doRead(stream) {
 | |
|     if (stream.destroyed)
 | |
|         return;
 | |
|     if (!stream.s.cursor)
 | |
|         return;
 | |
|     if (!stream.s.file)
 | |
|         return;
 | |
|     const handleReadResult = ({ error, doc }) => {
 | |
|         if (stream.destroyed) {
 | |
|             return;
 | |
|         }
 | |
|         if (error) {
 | |
|             stream.emit(GridFSBucketReadStream.ERROR, error);
 | |
|             return;
 | |
|         }
 | |
|         if (!doc) {
 | |
|             stream.push(null);
 | |
|             stream.s.cursor?.close().then(() => {
 | |
|                 stream.emit(GridFSBucketReadStream.CLOSE);
 | |
|             }, error => {
 | |
|                 stream.emit(GridFSBucketReadStream.ERROR, error);
 | |
|             });
 | |
|             return;
 | |
|         }
 | |
|         if (!stream.s.file)
 | |
|             return;
 | |
|         const bytesRemaining = stream.s.file.length - stream.s.bytesRead;
 | |
|         const expectedN = stream.s.expected++;
 | |
|         const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
 | |
|         if (doc.n > expectedN) {
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
 | |
|         }
 | |
|         if (doc.n < expectedN) {
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
 | |
|         }
 | |
|         let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
 | |
|         if (buf.byteLength !== expectedLength) {
 | |
|             if (bytesRemaining <= 0) {
 | |
|                 return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`));
 | |
|             }
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`));
 | |
|         }
 | |
|         stream.s.bytesRead += buf.byteLength;
 | |
|         if (buf.byteLength === 0) {
 | |
|             return stream.push(null);
 | |
|         }
 | |
|         let sliceStart = null;
 | |
|         let sliceEnd = null;
 | |
|         if (stream.s.bytesToSkip != null) {
 | |
|             sliceStart = stream.s.bytesToSkip;
 | |
|             stream.s.bytesToSkip = 0;
 | |
|         }
 | |
|         const atEndOfStream = expectedN === stream.s.expectedEnd - 1;
 | |
|         const bytesLeftToRead = stream.s.options.end - stream.s.bytesToSkip;
 | |
|         if (atEndOfStream && stream.s.bytesToTrim != null) {
 | |
|             sliceEnd = stream.s.file.chunkSize - stream.s.bytesToTrim;
 | |
|         }
 | |
|         else if (stream.s.options.end && bytesLeftToRead < doc.data.byteLength) {
 | |
|             sliceEnd = bytesLeftToRead;
 | |
|         }
 | |
|         if (sliceStart != null || sliceEnd != null) {
 | |
|             buf = buf.slice(sliceStart || 0, sliceEnd || buf.byteLength);
 | |
|         }
 | |
|         stream.push(buf);
 | |
|         return;
 | |
|     };
 | |
|     stream.s.cursor.next().then(doc => handleReadResult({ error: null, doc }), error => handleReadResult({ error, doc: null }));
 | |
| }
 | |
| function init(stream) {
 | |
|     const findOneOptions = {};
 | |
|     if (stream.s.readPreference) {
 | |
|         findOneOptions.readPreference = stream.s.readPreference;
 | |
|     }
 | |
|     if (stream.s.options && stream.s.options.sort) {
 | |
|         findOneOptions.sort = stream.s.options.sort;
 | |
|     }
 | |
|     if (stream.s.options && stream.s.options.skip) {
 | |
|         findOneOptions.skip = stream.s.options.skip;
 | |
|     }
 | |
|     const handleReadResult = ({ error, doc }) => {
 | |
|         if (error) {
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, error);
 | |
|         }
 | |
|         if (!doc) {
 | |
|             const identifier = stream.s.filter._id
 | |
|                 ? stream.s.filter._id.toString()
 | |
|                 : stream.s.filter.filename;
 | |
|             const errmsg = `FileNotFound: file ${identifier} was not found`;
 | |
|             // TODO(NODE-3483)
 | |
|             const err = new error_1.MongoRuntimeError(errmsg);
 | |
|             err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, err);
 | |
|         }
 | |
|         // If document is empty, kill the stream immediately and don't
 | |
|         // execute any reads
 | |
|         if (doc.length <= 0) {
 | |
|             stream.push(null);
 | |
|             return;
 | |
|         }
 | |
|         if (stream.destroyed) {
 | |
|             // If user destroys the stream before we have a cursor, wait
 | |
|             // until the query is done to say we're 'closed' because we can't
 | |
|             // cancel a query.
 | |
|             stream.emit(GridFSBucketReadStream.CLOSE);
 | |
|             return;
 | |
|         }
 | |
|         try {
 | |
|             stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
 | |
|         }
 | |
|         catch (error) {
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, error);
 | |
|         }
 | |
|         const filter = { files_id: doc._id };
 | |
|         // Currently (MongoDB 3.4.4) skip function does not support the index,
 | |
|         // it needs to retrieve all the documents first and then skip them. (CS-25811)
 | |
|         // As work around we use $gte on the "n" field.
 | |
|         if (stream.s.options && stream.s.options.start != null) {
 | |
|             const skip = Math.floor(stream.s.options.start / doc.chunkSize);
 | |
|             if (skip > 0) {
 | |
|                 filter['n'] = { $gte: skip };
 | |
|             }
 | |
|         }
 | |
|         stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });
 | |
|         if (stream.s.readPreference) {
 | |
|             stream.s.cursor.withReadPreference(stream.s.readPreference);
 | |
|         }
 | |
|         stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
 | |
|         stream.s.file = doc;
 | |
|         try {
 | |
|             stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
 | |
|         }
 | |
|         catch (error) {
 | |
|             return stream.emit(GridFSBucketReadStream.ERROR, error);
 | |
|         }
 | |
|         stream.emit(GridFSBucketReadStream.FILE, doc);
 | |
|         return;
 | |
|     };
 | |
|     stream.s.files.findOne(stream.s.filter, findOneOptions).then(doc => handleReadResult({ error: null, doc }), error => handleReadResult({ error, doc: null }));
 | |
| }
 | |
| function waitForFile(stream, callback) {
 | |
|     if (stream.s.file) {
 | |
|         return callback();
 | |
|     }
 | |
|     if (!stream.s.init) {
 | |
|         init(stream);
 | |
|         stream.s.init = true;
 | |
|     }
 | |
|     stream.once('file', () => {
 | |
|         callback();
 | |
|     });
 | |
| }
 | |
| function handleStartOption(stream, doc, options) {
 | |
|     if (options && options.start != null) {
 | |
|         if (options.start > doc.length) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be more than the length of the file (${doc.length})`);
 | |
|         }
 | |
|         if (options.start < 0) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be negative`);
 | |
|         }
 | |
|         if (options.end != null && options.end < options.start) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be greater than stream end (${options.end})`);
 | |
|         }
 | |
|         stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
 | |
|         stream.s.expected = Math.floor(options.start / doc.chunkSize);
 | |
|         return options.start - stream.s.bytesRead;
 | |
|     }
 | |
|     throw new error_1.MongoInvalidArgumentError('Start option must be defined');
 | |
| }
 | |
| function handleEndOption(stream, doc, cursor, options) {
 | |
|     if (options && options.end != null) {
 | |
|         if (options.end > doc.length) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be more than the length of the file (${doc.length})`);
 | |
|         }
 | |
|         if (options.start == null || options.start < 0) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be negative`);
 | |
|         }
 | |
|         const start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
 | |
|         cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
 | |
|         stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
 | |
|         return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
 | |
|     }
 | |
|     throw new error_1.MongoInvalidArgumentError('End option must be defined');
 | |
| }
 | |
| //# sourceMappingURL=download.js.map
 |