You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					342 lines
				
				12 KiB
			
		
		
			
		
	
	
					342 lines
				
				12 KiB
			| 
											3 years ago
										 | "use strict"; | ||
|  | Object.defineProperty(exports, "__esModule", { value: true }); | ||
|  | exports.GridFSBucketWriteStream = void 0; | ||
|  | const stream_1 = require("stream"); | ||
|  | const bson_1 = require("../bson"); | ||
|  | const error_1 = require("../error"); | ||
|  | const write_concern_1 = require("./../write_concern"); | ||
|  | /** | ||
|  |  * A writable stream that enables you to write buffers to GridFS. | ||
|  |  * | ||
|  |  * Do not instantiate this class directly. Use `openUploadStream()` instead. | ||
|  |  * @public | ||
|  |  */ | ||
|  | class GridFSBucketWriteStream extends stream_1.Writable { | ||
|  |     /** | ||
|  |      * @param bucket - Handle for this stream's corresponding bucket | ||
|  |      * @param filename - The value of the 'filename' key in the files doc | ||
|  |      * @param options - Optional settings. | ||
|  |      * @internal | ||
|  |      */ | ||
|  |     constructor(bucket, filename, options) { | ||
|  |         super(); | ||
|  |         options = options ?? {}; | ||
|  |         this.bucket = bucket; | ||
|  |         this.chunks = bucket.s._chunksCollection; | ||
|  |         this.filename = filename; | ||
|  |         this.files = bucket.s._filesCollection; | ||
|  |         this.options = options; | ||
|  |         this.writeConcern = write_concern_1.WriteConcern.fromOptions(options) || bucket.s.options.writeConcern; | ||
|  |         // Signals the write is all done
 | ||
|  |         this.done = false; | ||
|  |         this.id = options.id ? options.id : new bson_1.ObjectId(); | ||
|  |         // properly inherit the default chunksize from parent
 | ||
|  |         this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes; | ||
|  |         this.bufToStore = Buffer.alloc(this.chunkSizeBytes); | ||
|  |         this.length = 0; | ||
|  |         this.n = 0; | ||
|  |         this.pos = 0; | ||
|  |         this.state = { | ||
|  |             streamEnd: false, | ||
|  |             outstandingRequests: 0, | ||
|  |             errored: false, | ||
|  |             aborted: false | ||
|  |         }; | ||
|  |         if (!this.bucket.s.calledOpenUploadStream) { | ||
|  |             this.bucket.s.calledOpenUploadStream = true; | ||
|  |             checkIndexes(this).then(() => { | ||
|  |                 this.bucket.s.checkedIndexes = true; | ||
|  |                 this.bucket.emit('index'); | ||
|  |             }, () => null); | ||
|  |         } | ||
|  |     } | ||
|  |     write(chunk, encodingOrCallback, callback) { | ||
|  |         const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback; | ||
|  |         callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback; | ||
|  |         return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback)); | ||
|  |     } | ||
|  |     /** | ||
|  |      * Places this write stream into an aborted state (all future writes fail) | ||
|  |      * and deletes all chunks that have already been written. | ||
|  |      */ | ||
|  |     async abort() { | ||
|  |         if (this.state.streamEnd) { | ||
|  |             // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | ||
|  |             throw new error_1.MongoAPIError('Cannot abort a stream that has already completed'); | ||
|  |         } | ||
|  |         if (this.state.aborted) { | ||
|  |             // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | ||
|  |             throw new error_1.MongoAPIError('Cannot call abort() on a stream twice'); | ||
|  |         } | ||
|  |         this.state.aborted = true; | ||
|  |         await this.chunks.deleteMany({ files_id: this.id }); | ||
|  |     } | ||
|  |     end(chunkOrCallback, encodingOrCallback, callback) { | ||
|  |         const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback; | ||
|  |         const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback; | ||
|  |         callback = | ||
|  |             typeof chunkOrCallback === 'function' | ||
|  |                 ? chunkOrCallback | ||
|  |                 : typeof encodingOrCallback === 'function' | ||
|  |                     ? encodingOrCallback | ||
|  |                     : callback; | ||
|  |         if (this.state.streamEnd || checkAborted(this, callback)) | ||
|  |             return this; | ||
|  |         this.state.streamEnd = true; | ||
|  |         if (callback) { | ||
|  |             this.once(GridFSBucketWriteStream.FINISH, (result) => { | ||
|  |                 if (callback) | ||
|  |                     callback(undefined, result); | ||
|  |             }); | ||
|  |         } | ||
|  |         if (!chunk) { | ||
|  |             waitForIndexes(this, () => !!writeRemnant(this)); | ||
|  |             return this; | ||
|  |         } | ||
|  |         this.write(chunk, encoding, () => { | ||
|  |             writeRemnant(this); | ||
|  |         }); | ||
|  |         return this; | ||
|  |     } | ||
|  | } | ||
|  | exports.GridFSBucketWriteStream = GridFSBucketWriteStream; | ||
|  | /** @event */ | ||
|  | GridFSBucketWriteStream.CLOSE = 'close'; | ||
|  | /** @event */ | ||
|  | GridFSBucketWriteStream.ERROR = 'error'; | ||
|  | /** | ||
|  |  * `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB. | ||
|  |  * @event | ||
|  |  */ | ||
|  | GridFSBucketWriteStream.FINISH = 'finish'; | ||
|  | function __handleError(stream, error, callback) { | ||
|  |     if (stream.state.errored) { | ||
|  |         return; | ||
|  |     } | ||
|  |     stream.state.errored = true; | ||
|  |     if (callback) { | ||
|  |         return callback(error); | ||
|  |     } | ||
|  |     stream.emit(GridFSBucketWriteStream.ERROR, error); | ||
|  | } | ||
|  | function createChunkDoc(filesId, n, data) { | ||
|  |     return { | ||
|  |         _id: new bson_1.ObjectId(), | ||
|  |         files_id: filesId, | ||
|  |         n, | ||
|  |         data | ||
|  |     }; | ||
|  | } | ||
|  | async function checkChunksIndex(stream) { | ||
|  |     const index = { files_id: 1, n: 1 }; | ||
|  |     let indexes; | ||
|  |     try { | ||
|  |         indexes = await stream.chunks.listIndexes().toArray(); | ||
|  |     } | ||
|  |     catch (error) { | ||
|  |         if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) { | ||
|  |             indexes = []; | ||
|  |         } | ||
|  |         else { | ||
|  |             throw error; | ||
|  |         } | ||
|  |     } | ||
|  |     const hasChunksIndex = !!indexes.find(index => { | ||
|  |         const keys = Object.keys(index.key); | ||
|  |         if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) { | ||
|  |             return true; | ||
|  |         } | ||
|  |         return false; | ||
|  |     }); | ||
|  |     if (!hasChunksIndex) { | ||
|  |         const writeConcernOptions = getWriteOptions(stream); | ||
|  |         await stream.chunks.createIndex(index, { | ||
|  |             ...writeConcernOptions, | ||
|  |             background: true, | ||
|  |             unique: true | ||
|  |         }); | ||
|  |     } | ||
|  | } | ||
|  | function checkDone(stream, callback) { | ||
|  |     if (stream.done) | ||
|  |         return true; | ||
|  |     if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) { | ||
|  |         // Set done so we do not trigger duplicate createFilesDoc
 | ||
|  |         stream.done = true; | ||
|  |         // Create a new files doc
 | ||
|  |         const filesDoc = createFilesDoc(stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.contentType, stream.options.aliases, stream.options.metadata); | ||
|  |         if (checkAborted(stream, callback)) { | ||
|  |             return false; | ||
|  |         } | ||
|  |         stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(() => { | ||
|  |             stream.emit(GridFSBucketWriteStream.FINISH, filesDoc); | ||
|  |             stream.emit(GridFSBucketWriteStream.CLOSE); | ||
|  |         }, error => { | ||
|  |             return __handleError(stream, error, callback); | ||
|  |         }); | ||
|  |         return true; | ||
|  |     } | ||
|  |     return false; | ||
|  | } | ||
|  | async function checkIndexes(stream) { | ||
|  |     const doc = await stream.files.findOne({}, { projection: { _id: 1 } }); | ||
|  |     if (doc != null) { | ||
|  |         // If at least one document exists assume the collection has the required index
 | ||
|  |         return; | ||
|  |     } | ||
|  |     const index = { filename: 1, uploadDate: 1 }; | ||
|  |     let indexes; | ||
|  |     try { | ||
|  |         indexes = await stream.files.listIndexes().toArray(); | ||
|  |     } | ||
|  |     catch (error) { | ||
|  |         if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) { | ||
|  |             indexes = []; | ||
|  |         } | ||
|  |         else { | ||
|  |             throw error; | ||
|  |         } | ||
|  |     } | ||
|  |     const hasFileIndex = !!indexes.find(index => { | ||
|  |         const keys = Object.keys(index.key); | ||
|  |         if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) { | ||
|  |             return true; | ||
|  |         } | ||
|  |         return false; | ||
|  |     }); | ||
|  |     if (!hasFileIndex) { | ||
|  |         await stream.files.createIndex(index, { background: false }); | ||
|  |     } | ||
|  |     await checkChunksIndex(stream); | ||
|  | } | ||
|  | function createFilesDoc(_id, length, chunkSize, filename, contentType, aliases, metadata) { | ||
|  |     const ret = { | ||
|  |         _id, | ||
|  |         length, | ||
|  |         chunkSize, | ||
|  |         uploadDate: new Date(), | ||
|  |         filename | ||
|  |     }; | ||
|  |     if (contentType) { | ||
|  |         ret.contentType = contentType; | ||
|  |     } | ||
|  |     if (aliases) { | ||
|  |         ret.aliases = aliases; | ||
|  |     } | ||
|  |     if (metadata) { | ||
|  |         ret.metadata = metadata; | ||
|  |     } | ||
|  |     return ret; | ||
|  | } | ||
|  | function doWrite(stream, chunk, encoding, callback) { | ||
|  |     if (checkAborted(stream, callback)) { | ||
|  |         return false; | ||
|  |     } | ||
|  |     const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); | ||
|  |     stream.length += inputBuf.length; | ||
|  |     // Input is small enough to fit in our buffer
 | ||
|  |     if (stream.pos + inputBuf.length < stream.chunkSizeBytes) { | ||
|  |         inputBuf.copy(stream.bufToStore, stream.pos); | ||
|  |         stream.pos += inputBuf.length; | ||
|  |         callback && callback(); | ||
|  |         // Note that we reverse the typical semantics of write's return value
 | ||
|  |         // to be compatible with node's `.pipe()` function.
 | ||
|  |         // True means client can keep writing.
 | ||
|  |         return true; | ||
|  |     } | ||
|  |     // Otherwise, buffer is too big for current chunk, so we need to flush
 | ||
|  |     // to MongoDB.
 | ||
|  |     let inputBufRemaining = inputBuf.length; | ||
|  |     let spaceRemaining = stream.chunkSizeBytes - stream.pos; | ||
|  |     let numToCopy = Math.min(spaceRemaining, inputBuf.length); | ||
|  |     let outstandingRequests = 0; | ||
|  |     while (inputBufRemaining > 0) { | ||
|  |         const inputBufPos = inputBuf.length - inputBufRemaining; | ||
|  |         inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy); | ||
|  |         stream.pos += numToCopy; | ||
|  |         spaceRemaining -= numToCopy; | ||
|  |         let doc; | ||
|  |         if (spaceRemaining === 0) { | ||
|  |             doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore)); | ||
|  |             ++stream.state.outstandingRequests; | ||
|  |             ++outstandingRequests; | ||
|  |             if (checkAborted(stream, callback)) { | ||
|  |                 return false; | ||
|  |             } | ||
|  |             stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => { | ||
|  |                 --stream.state.outstandingRequests; | ||
|  |                 --outstandingRequests; | ||
|  |                 if (!outstandingRequests) { | ||
|  |                     stream.emit('drain', doc); | ||
|  |                     callback && callback(); | ||
|  |                     checkDone(stream); | ||
|  |                 } | ||
|  |             }, error => { | ||
|  |                 return __handleError(stream, error); | ||
|  |             }); | ||
|  |             spaceRemaining = stream.chunkSizeBytes; | ||
|  |             stream.pos = 0; | ||
|  |             ++stream.n; | ||
|  |         } | ||
|  |         inputBufRemaining -= numToCopy; | ||
|  |         numToCopy = Math.min(spaceRemaining, inputBufRemaining); | ||
|  |     } | ||
|  |     // Note that we reverse the typical semantics of write's return value
 | ||
|  |     // to be compatible with node's `.pipe()` function.
 | ||
|  |     // False means the client should wait for the 'drain' event.
 | ||
|  |     return false; | ||
|  | } | ||
|  | function getWriteOptions(stream) { | ||
|  |     const obj = {}; | ||
|  |     if (stream.writeConcern) { | ||
|  |         obj.writeConcern = { | ||
|  |             w: stream.writeConcern.w, | ||
|  |             wtimeout: stream.writeConcern.wtimeout, | ||
|  |             j: stream.writeConcern.j | ||
|  |         }; | ||
|  |     } | ||
|  |     return obj; | ||
|  | } | ||
|  | function waitForIndexes(stream, callback) { | ||
|  |     if (stream.bucket.s.checkedIndexes) { | ||
|  |         return callback(false); | ||
|  |     } | ||
|  |     stream.bucket.once('index', () => { | ||
|  |         callback(true); | ||
|  |     }); | ||
|  |     return true; | ||
|  | } | ||
|  | function writeRemnant(stream, callback) { | ||
|  |     // Buffer is empty, so don't bother to insert
 | ||
|  |     if (stream.pos === 0) { | ||
|  |         return checkDone(stream, callback); | ||
|  |     } | ||
|  |     ++stream.state.outstandingRequests; | ||
|  |     // Create a new buffer to make sure the buffer isn't bigger than it needs
 | ||
|  |     // to be.
 | ||
|  |     const remnant = Buffer.alloc(stream.pos); | ||
|  |     stream.bufToStore.copy(remnant, 0, 0, stream.pos); | ||
|  |     const doc = createChunkDoc(stream.id, stream.n, remnant); | ||
|  |     // If the stream was aborted, do not write remnant
 | ||
|  |     if (checkAborted(stream, callback)) { | ||
|  |         return false; | ||
|  |     } | ||
|  |     stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => { | ||
|  |         --stream.state.outstandingRequests; | ||
|  |         checkDone(stream); | ||
|  |     }, error => { | ||
|  |         return __handleError(stream, error); | ||
|  |     }); | ||
|  |     return true; | ||
|  | } | ||
|  | function checkAborted(stream, callback) { | ||
|  |     if (stream.state.aborted) { | ||
|  |         if (typeof callback === 'function') { | ||
|  |             // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
 | ||
|  |             callback(new error_1.MongoAPIError('Stream has been aborted')); | ||
|  |         } | ||
|  |         return true; | ||
|  |     } | ||
|  |     return false; | ||
|  | } | ||
|  | //# sourceMappingURL=upload.js.map
 |