You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							342 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							342 lines
						
					
					
						
							12 KiB
						
					
					
				| "use strict";
 | |
| Object.defineProperty(exports, "__esModule", { value: true });
 | |
| exports.GridFSBucketWriteStream = void 0;
 | |
| const stream_1 = require("stream");
 | |
| const bson_1 = require("../bson");
 | |
| const error_1 = require("../error");
 | |
| const write_concern_1 = require("./../write_concern");
 | |
| /**
 | |
|  * A writable stream that enables you to write buffers to GridFS.
 | |
|  *
 | |
|  * Do not instantiate this class directly. Use `openUploadStream()` instead.
 | |
|  * @public
 | |
|  */
 | |
| class GridFSBucketWriteStream extends stream_1.Writable {
 | |
|     /**
 | |
|      * @param bucket - Handle for this stream's corresponding bucket
 | |
|      * @param filename - The value of the 'filename' key in the files doc
 | |
|      * @param options - Optional settings.
 | |
|      * @internal
 | |
|      */
 | |
|     constructor(bucket, filename, options) {
 | |
|         super();
 | |
|         options = options ?? {};
 | |
|         this.bucket = bucket;
 | |
|         this.chunks = bucket.s._chunksCollection;
 | |
|         this.filename = filename;
 | |
|         this.files = bucket.s._filesCollection;
 | |
|         this.options = options;
 | |
|         this.writeConcern = write_concern_1.WriteConcern.fromOptions(options) || bucket.s.options.writeConcern;
 | |
|         // Signals the write is all done
 | |
|         this.done = false;
 | |
|         this.id = options.id ? options.id : new bson_1.ObjectId();
 | |
|         // properly inherit the default chunksize from parent
 | |
|         this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes;
 | |
|         this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
 | |
|         this.length = 0;
 | |
|         this.n = 0;
 | |
|         this.pos = 0;
 | |
|         this.state = {
 | |
|             streamEnd: false,
 | |
|             outstandingRequests: 0,
 | |
|             errored: false,
 | |
|             aborted: false
 | |
|         };
 | |
|         if (!this.bucket.s.calledOpenUploadStream) {
 | |
|             this.bucket.s.calledOpenUploadStream = true;
 | |
|             checkIndexes(this).then(() => {
 | |
|                 this.bucket.s.checkedIndexes = true;
 | |
|                 this.bucket.emit('index');
 | |
|             }, () => null);
 | |
|         }
 | |
|     }
 | |
|     write(chunk, encodingOrCallback, callback) {
 | |
|         const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
 | |
|         callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback;
 | |
|         return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback));
 | |
|     }
 | |
|     /**
 | |
|      * Places this write stream into an aborted state (all future writes fail)
 | |
|      * and deletes all chunks that have already been written.
 | |
|      */
 | |
|     async abort() {
 | |
|         if (this.state.streamEnd) {
 | |
|             // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | |
|             throw new error_1.MongoAPIError('Cannot abort a stream that has already completed');
 | |
|         }
 | |
|         if (this.state.aborted) {
 | |
|             // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | |
|             throw new error_1.MongoAPIError('Cannot call abort() on a stream twice');
 | |
|         }
 | |
|         this.state.aborted = true;
 | |
|         await this.chunks.deleteMany({ files_id: this.id });
 | |
|     }
 | |
|     end(chunkOrCallback, encodingOrCallback, callback) {
 | |
|         const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback;
 | |
|         const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
 | |
|         callback =
 | |
|             typeof chunkOrCallback === 'function'
 | |
|                 ? chunkOrCallback
 | |
|                 : typeof encodingOrCallback === 'function'
 | |
|                     ? encodingOrCallback
 | |
|                     : callback;
 | |
|         if (this.state.streamEnd || checkAborted(this, callback))
 | |
|             return this;
 | |
|         this.state.streamEnd = true;
 | |
|         if (callback) {
 | |
|             this.once(GridFSBucketWriteStream.FINISH, (result) => {
 | |
|                 if (callback)
 | |
|                     callback(undefined, result);
 | |
|             });
 | |
|         }
 | |
|         if (!chunk) {
 | |
|             waitForIndexes(this, () => !!writeRemnant(this));
 | |
|             return this;
 | |
|         }
 | |
|         this.write(chunk, encoding, () => {
 | |
|             writeRemnant(this);
 | |
|         });
 | |
|         return this;
 | |
|     }
 | |
| }
 | |
| exports.GridFSBucketWriteStream = GridFSBucketWriteStream;
 | |
| /** @event */
 | |
| GridFSBucketWriteStream.CLOSE = 'close';
 | |
| /** @event */
 | |
| GridFSBucketWriteStream.ERROR = 'error';
 | |
| /**
 | |
|  * `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB.
 | |
|  * @event
 | |
|  */
 | |
| GridFSBucketWriteStream.FINISH = 'finish';
 | |
| function __handleError(stream, error, callback) {
 | |
|     if (stream.state.errored) {
 | |
|         return;
 | |
|     }
 | |
|     stream.state.errored = true;
 | |
|     if (callback) {
 | |
|         return callback(error);
 | |
|     }
 | |
|     stream.emit(GridFSBucketWriteStream.ERROR, error);
 | |
| }
 | |
| function createChunkDoc(filesId, n, data) {
 | |
|     return {
 | |
|         _id: new bson_1.ObjectId(),
 | |
|         files_id: filesId,
 | |
|         n,
 | |
|         data
 | |
|     };
 | |
| }
 | |
| async function checkChunksIndex(stream) {
 | |
|     const index = { files_id: 1, n: 1 };
 | |
|     let indexes;
 | |
|     try {
 | |
|         indexes = await stream.chunks.listIndexes().toArray();
 | |
|     }
 | |
|     catch (error) {
 | |
|         if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
 | |
|             indexes = [];
 | |
|         }
 | |
|         else {
 | |
|             throw error;
 | |
|         }
 | |
|     }
 | |
|     const hasChunksIndex = !!indexes.find(index => {
 | |
|         const keys = Object.keys(index.key);
 | |
|         if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
 | |
|             return true;
 | |
|         }
 | |
|         return false;
 | |
|     });
 | |
|     if (!hasChunksIndex) {
 | |
|         const writeConcernOptions = getWriteOptions(stream);
 | |
|         await stream.chunks.createIndex(index, {
 | |
|             ...writeConcernOptions,
 | |
|             background: true,
 | |
|             unique: true
 | |
|         });
 | |
|     }
 | |
| }
 | |
| function checkDone(stream, callback) {
 | |
|     if (stream.done)
 | |
|         return true;
 | |
|     if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
 | |
|         // Set done so we do not trigger duplicate createFilesDoc
 | |
|         stream.done = true;
 | |
|         // Create a new files doc
 | |
|         const filesDoc = createFilesDoc(stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.contentType, stream.options.aliases, stream.options.metadata);
 | |
|         if (checkAborted(stream, callback)) {
 | |
|             return false;
 | |
|         }
 | |
|         stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(() => {
 | |
|             stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
 | |
|             stream.emit(GridFSBucketWriteStream.CLOSE);
 | |
|         }, error => {
 | |
|             return __handleError(stream, error, callback);
 | |
|         });
 | |
|         return true;
 | |
|     }
 | |
|     return false;
 | |
| }
 | |
| async function checkIndexes(stream) {
 | |
|     const doc = await stream.files.findOne({}, { projection: { _id: 1 } });
 | |
|     if (doc != null) {
 | |
|         // If at least one document exists assume the collection has the required index
 | |
|         return;
 | |
|     }
 | |
|     const index = { filename: 1, uploadDate: 1 };
 | |
|     let indexes;
 | |
|     try {
 | |
|         indexes = await stream.files.listIndexes().toArray();
 | |
|     }
 | |
|     catch (error) {
 | |
|         if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
 | |
|             indexes = [];
 | |
|         }
 | |
|         else {
 | |
|             throw error;
 | |
|         }
 | |
|     }
 | |
|     const hasFileIndex = !!indexes.find(index => {
 | |
|         const keys = Object.keys(index.key);
 | |
|         if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
 | |
|             return true;
 | |
|         }
 | |
|         return false;
 | |
|     });
 | |
|     if (!hasFileIndex) {
 | |
|         await stream.files.createIndex(index, { background: false });
 | |
|     }
 | |
|     await checkChunksIndex(stream);
 | |
| }
 | |
| function createFilesDoc(_id, length, chunkSize, filename, contentType, aliases, metadata) {
 | |
|     const ret = {
 | |
|         _id,
 | |
|         length,
 | |
|         chunkSize,
 | |
|         uploadDate: new Date(),
 | |
|         filename
 | |
|     };
 | |
|     if (contentType) {
 | |
|         ret.contentType = contentType;
 | |
|     }
 | |
|     if (aliases) {
 | |
|         ret.aliases = aliases;
 | |
|     }
 | |
|     if (metadata) {
 | |
|         ret.metadata = metadata;
 | |
|     }
 | |
|     return ret;
 | |
| }
 | |
| function doWrite(stream, chunk, encoding, callback) {
 | |
|     if (checkAborted(stream, callback)) {
 | |
|         return false;
 | |
|     }
 | |
|     const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
 | |
|     stream.length += inputBuf.length;
 | |
|     // Input is small enough to fit in our buffer
 | |
|     if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
 | |
|         inputBuf.copy(stream.bufToStore, stream.pos);
 | |
|         stream.pos += inputBuf.length;
 | |
|         callback && callback();
 | |
|         // Note that we reverse the typical semantics of write's return value
 | |
|         // to be compatible with node's `.pipe()` function.
 | |
|         // True means client can keep writing.
 | |
|         return true;
 | |
|     }
 | |
|     // Otherwise, buffer is too big for current chunk, so we need to flush
 | |
|     // to MongoDB.
 | |
|     let inputBufRemaining = inputBuf.length;
 | |
|     let spaceRemaining = stream.chunkSizeBytes - stream.pos;
 | |
|     let numToCopy = Math.min(spaceRemaining, inputBuf.length);
 | |
|     let outstandingRequests = 0;
 | |
|     while (inputBufRemaining > 0) {
 | |
|         const inputBufPos = inputBuf.length - inputBufRemaining;
 | |
|         inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy);
 | |
|         stream.pos += numToCopy;
 | |
|         spaceRemaining -= numToCopy;
 | |
|         let doc;
 | |
|         if (spaceRemaining === 0) {
 | |
|             doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore));
 | |
|             ++stream.state.outstandingRequests;
 | |
|             ++outstandingRequests;
 | |
|             if (checkAborted(stream, callback)) {
 | |
|                 return false;
 | |
|             }
 | |
|             stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => {
 | |
|                 --stream.state.outstandingRequests;
 | |
|                 --outstandingRequests;
 | |
|                 if (!outstandingRequests) {
 | |
|                     stream.emit('drain', doc);
 | |
|                     callback && callback();
 | |
|                     checkDone(stream);
 | |
|                 }
 | |
|             }, error => {
 | |
|                 return __handleError(stream, error);
 | |
|             });
 | |
|             spaceRemaining = stream.chunkSizeBytes;
 | |
|             stream.pos = 0;
 | |
|             ++stream.n;
 | |
|         }
 | |
|         inputBufRemaining -= numToCopy;
 | |
|         numToCopy = Math.min(spaceRemaining, inputBufRemaining);
 | |
|     }
 | |
|     // Note that we reverse the typical semantics of write's return value
 | |
|     // to be compatible with node's `.pipe()` function.
 | |
|     // False means the client should wait for the 'drain' event.
 | |
|     return false;
 | |
| }
 | |
| function getWriteOptions(stream) {
 | |
|     const obj = {};
 | |
|     if (stream.writeConcern) {
 | |
|         obj.writeConcern = {
 | |
|             w: stream.writeConcern.w,
 | |
|             wtimeout: stream.writeConcern.wtimeout,
 | |
|             j: stream.writeConcern.j
 | |
|         };
 | |
|     }
 | |
|     return obj;
 | |
| }
 | |
| function waitForIndexes(stream, callback) {
 | |
|     if (stream.bucket.s.checkedIndexes) {
 | |
|         return callback(false);
 | |
|     }
 | |
|     stream.bucket.once('index', () => {
 | |
|         callback(true);
 | |
|     });
 | |
|     return true;
 | |
| }
 | |
| function writeRemnant(stream, callback) {
 | |
|     // Buffer is empty, so don't bother to insert
 | |
|     if (stream.pos === 0) {
 | |
|         return checkDone(stream, callback);
 | |
|     }
 | |
|     ++stream.state.outstandingRequests;
 | |
|     // Create a new buffer to make sure the buffer isn't bigger than it needs
 | |
|     // to be.
 | |
|     const remnant = Buffer.alloc(stream.pos);
 | |
|     stream.bufToStore.copy(remnant, 0, 0, stream.pos);
 | |
|     const doc = createChunkDoc(stream.id, stream.n, remnant);
 | |
|     // If the stream was aborted, do not write remnant
 | |
|     if (checkAborted(stream, callback)) {
 | |
|         return false;
 | |
|     }
 | |
|     stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => {
 | |
|         --stream.state.outstandingRequests;
 | |
|         checkDone(stream);
 | |
|     }, error => {
 | |
|         return __handleError(stream, error);
 | |
|     });
 | |
|     return true;
 | |
| }
 | |
| function checkAborted(stream, callback) {
 | |
|     if (stream.state.aborted) {
 | |
|         if (typeof callback === 'function') {
 | |
|             // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
 | |
|             callback(new error_1.MongoAPIError('Stream has been aborted'));
 | |
|         }
 | |
|         return true;
 | |
|     }
 | |
|     return false;
 | |
| }
 | |
| //# sourceMappingURL=upload.js.map
 |