You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							342 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							342 lines
						
					
					
						
							12 KiB
						
					
					
				"use strict";
 | 
						|
Object.defineProperty(exports, "__esModule", { value: true });
 | 
						|
exports.GridFSBucketWriteStream = void 0;
 | 
						|
const stream_1 = require("stream");
 | 
						|
const bson_1 = require("../bson");
 | 
						|
const error_1 = require("../error");
 | 
						|
const write_concern_1 = require("./../write_concern");
 | 
						|
/**
 | 
						|
 * A writable stream that enables you to write buffers to GridFS.
 | 
						|
 *
 | 
						|
 * Do not instantiate this class directly. Use `openUploadStream()` instead.
 | 
						|
 * @public
 | 
						|
 */
 | 
						|
class GridFSBucketWriteStream extends stream_1.Writable {
 | 
						|
    /**
 | 
						|
     * @param bucket - Handle for this stream's corresponding bucket
 | 
						|
     * @param filename - The value of the 'filename' key in the files doc
 | 
						|
     * @param options - Optional settings.
 | 
						|
     * @internal
 | 
						|
     */
 | 
						|
    constructor(bucket, filename, options) {
 | 
						|
        super();
 | 
						|
        options = options ?? {};
 | 
						|
        this.bucket = bucket;
 | 
						|
        this.chunks = bucket.s._chunksCollection;
 | 
						|
        this.filename = filename;
 | 
						|
        this.files = bucket.s._filesCollection;
 | 
						|
        this.options = options;
 | 
						|
        this.writeConcern = write_concern_1.WriteConcern.fromOptions(options) || bucket.s.options.writeConcern;
 | 
						|
        // Signals the write is all done
 | 
						|
        this.done = false;
 | 
						|
        this.id = options.id ? options.id : new bson_1.ObjectId();
 | 
						|
        // properly inherit the default chunksize from parent
 | 
						|
        this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes;
 | 
						|
        this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
 | 
						|
        this.length = 0;
 | 
						|
        this.n = 0;
 | 
						|
        this.pos = 0;
 | 
						|
        this.state = {
 | 
						|
            streamEnd: false,
 | 
						|
            outstandingRequests: 0,
 | 
						|
            errored: false,
 | 
						|
            aborted: false
 | 
						|
        };
 | 
						|
        if (!this.bucket.s.calledOpenUploadStream) {
 | 
						|
            this.bucket.s.calledOpenUploadStream = true;
 | 
						|
            checkIndexes(this).then(() => {
 | 
						|
                this.bucket.s.checkedIndexes = true;
 | 
						|
                this.bucket.emit('index');
 | 
						|
            }, () => null);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    write(chunk, encodingOrCallback, callback) {
 | 
						|
        const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
 | 
						|
        callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback;
 | 
						|
        return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback));
 | 
						|
    }
 | 
						|
    /**
 | 
						|
     * Places this write stream into an aborted state (all future writes fail)
 | 
						|
     * and deletes all chunks that have already been written.
 | 
						|
     */
 | 
						|
    async abort() {
 | 
						|
        if (this.state.streamEnd) {
 | 
						|
            // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | 
						|
            throw new error_1.MongoAPIError('Cannot abort a stream that has already completed');
 | 
						|
        }
 | 
						|
        if (this.state.aborted) {
 | 
						|
            // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
 | 
						|
            throw new error_1.MongoAPIError('Cannot call abort() on a stream twice');
 | 
						|
        }
 | 
						|
        this.state.aborted = true;
 | 
						|
        await this.chunks.deleteMany({ files_id: this.id });
 | 
						|
    }
 | 
						|
    end(chunkOrCallback, encodingOrCallback, callback) {
 | 
						|
        const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback;
 | 
						|
        const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
 | 
						|
        callback =
 | 
						|
            typeof chunkOrCallback === 'function'
 | 
						|
                ? chunkOrCallback
 | 
						|
                : typeof encodingOrCallback === 'function'
 | 
						|
                    ? encodingOrCallback
 | 
						|
                    : callback;
 | 
						|
        if (this.state.streamEnd || checkAborted(this, callback))
 | 
						|
            return this;
 | 
						|
        this.state.streamEnd = true;
 | 
						|
        if (callback) {
 | 
						|
            this.once(GridFSBucketWriteStream.FINISH, (result) => {
 | 
						|
                if (callback)
 | 
						|
                    callback(undefined, result);
 | 
						|
            });
 | 
						|
        }
 | 
						|
        if (!chunk) {
 | 
						|
            waitForIndexes(this, () => !!writeRemnant(this));
 | 
						|
            return this;
 | 
						|
        }
 | 
						|
        this.write(chunk, encoding, () => {
 | 
						|
            writeRemnant(this);
 | 
						|
        });
 | 
						|
        return this;
 | 
						|
    }
 | 
						|
}
 | 
						|
exports.GridFSBucketWriteStream = GridFSBucketWriteStream;
 | 
						|
/** @event */
 | 
						|
GridFSBucketWriteStream.CLOSE = 'close';
 | 
						|
/** @event */
 | 
						|
GridFSBucketWriteStream.ERROR = 'error';
 | 
						|
/**
 | 
						|
 * `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB.
 | 
						|
 * @event
 | 
						|
 */
 | 
						|
GridFSBucketWriteStream.FINISH = 'finish';
 | 
						|
function __handleError(stream, error, callback) {
 | 
						|
    if (stream.state.errored) {
 | 
						|
        return;
 | 
						|
    }
 | 
						|
    stream.state.errored = true;
 | 
						|
    if (callback) {
 | 
						|
        return callback(error);
 | 
						|
    }
 | 
						|
    stream.emit(GridFSBucketWriteStream.ERROR, error);
 | 
						|
}
 | 
						|
function createChunkDoc(filesId, n, data) {
 | 
						|
    return {
 | 
						|
        _id: new bson_1.ObjectId(),
 | 
						|
        files_id: filesId,
 | 
						|
        n,
 | 
						|
        data
 | 
						|
    };
 | 
						|
}
 | 
						|
async function checkChunksIndex(stream) {
 | 
						|
    const index = { files_id: 1, n: 1 };
 | 
						|
    let indexes;
 | 
						|
    try {
 | 
						|
        indexes = await stream.chunks.listIndexes().toArray();
 | 
						|
    }
 | 
						|
    catch (error) {
 | 
						|
        if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
 | 
						|
            indexes = [];
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            throw error;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    const hasChunksIndex = !!indexes.find(index => {
 | 
						|
        const keys = Object.keys(index.key);
 | 
						|
        if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
 | 
						|
            return true;
 | 
						|
        }
 | 
						|
        return false;
 | 
						|
    });
 | 
						|
    if (!hasChunksIndex) {
 | 
						|
        const writeConcernOptions = getWriteOptions(stream);
 | 
						|
        await stream.chunks.createIndex(index, {
 | 
						|
            ...writeConcernOptions,
 | 
						|
            background: true,
 | 
						|
            unique: true
 | 
						|
        });
 | 
						|
    }
 | 
						|
}
 | 
						|
function checkDone(stream, callback) {
 | 
						|
    if (stream.done)
 | 
						|
        return true;
 | 
						|
    if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
 | 
						|
        // Set done so we do not trigger duplicate createFilesDoc
 | 
						|
        stream.done = true;
 | 
						|
        // Create a new files doc
 | 
						|
        const filesDoc = createFilesDoc(stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.contentType, stream.options.aliases, stream.options.metadata);
 | 
						|
        if (checkAborted(stream, callback)) {
 | 
						|
            return false;
 | 
						|
        }
 | 
						|
        stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(() => {
 | 
						|
            stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
 | 
						|
            stream.emit(GridFSBucketWriteStream.CLOSE);
 | 
						|
        }, error => {
 | 
						|
            return __handleError(stream, error, callback);
 | 
						|
        });
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
    return false;
 | 
						|
}
 | 
						|
async function checkIndexes(stream) {
 | 
						|
    const doc = await stream.files.findOne({}, { projection: { _id: 1 } });
 | 
						|
    if (doc != null) {
 | 
						|
        // If at least one document exists assume the collection has the required index
 | 
						|
        return;
 | 
						|
    }
 | 
						|
    const index = { filename: 1, uploadDate: 1 };
 | 
						|
    let indexes;
 | 
						|
    try {
 | 
						|
        indexes = await stream.files.listIndexes().toArray();
 | 
						|
    }
 | 
						|
    catch (error) {
 | 
						|
        if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
 | 
						|
            indexes = [];
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            throw error;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    const hasFileIndex = !!indexes.find(index => {
 | 
						|
        const keys = Object.keys(index.key);
 | 
						|
        if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
 | 
						|
            return true;
 | 
						|
        }
 | 
						|
        return false;
 | 
						|
    });
 | 
						|
    if (!hasFileIndex) {
 | 
						|
        await stream.files.createIndex(index, { background: false });
 | 
						|
    }
 | 
						|
    await checkChunksIndex(stream);
 | 
						|
}
 | 
						|
function createFilesDoc(_id, length, chunkSize, filename, contentType, aliases, metadata) {
 | 
						|
    const ret = {
 | 
						|
        _id,
 | 
						|
        length,
 | 
						|
        chunkSize,
 | 
						|
        uploadDate: new Date(),
 | 
						|
        filename
 | 
						|
    };
 | 
						|
    if (contentType) {
 | 
						|
        ret.contentType = contentType;
 | 
						|
    }
 | 
						|
    if (aliases) {
 | 
						|
        ret.aliases = aliases;
 | 
						|
    }
 | 
						|
    if (metadata) {
 | 
						|
        ret.metadata = metadata;
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
}
 | 
						|
function doWrite(stream, chunk, encoding, callback) {
 | 
						|
    if (checkAborted(stream, callback)) {
 | 
						|
        return false;
 | 
						|
    }
 | 
						|
    const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
 | 
						|
    stream.length += inputBuf.length;
 | 
						|
    // Input is small enough to fit in our buffer
 | 
						|
    if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
 | 
						|
        inputBuf.copy(stream.bufToStore, stream.pos);
 | 
						|
        stream.pos += inputBuf.length;
 | 
						|
        callback && callback();
 | 
						|
        // Note that we reverse the typical semantics of write's return value
 | 
						|
        // to be compatible with node's `.pipe()` function.
 | 
						|
        // True means client can keep writing.
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
    // Otherwise, buffer is too big for current chunk, so we need to flush
 | 
						|
    // to MongoDB.
 | 
						|
    let inputBufRemaining = inputBuf.length;
 | 
						|
    let spaceRemaining = stream.chunkSizeBytes - stream.pos;
 | 
						|
    let numToCopy = Math.min(spaceRemaining, inputBuf.length);
 | 
						|
    let outstandingRequests = 0;
 | 
						|
    while (inputBufRemaining > 0) {
 | 
						|
        const inputBufPos = inputBuf.length - inputBufRemaining;
 | 
						|
        inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy);
 | 
						|
        stream.pos += numToCopy;
 | 
						|
        spaceRemaining -= numToCopy;
 | 
						|
        let doc;
 | 
						|
        if (spaceRemaining === 0) {
 | 
						|
            doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore));
 | 
						|
            ++stream.state.outstandingRequests;
 | 
						|
            ++outstandingRequests;
 | 
						|
            if (checkAborted(stream, callback)) {
 | 
						|
                return false;
 | 
						|
            }
 | 
						|
            stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => {
 | 
						|
                --stream.state.outstandingRequests;
 | 
						|
                --outstandingRequests;
 | 
						|
                if (!outstandingRequests) {
 | 
						|
                    stream.emit('drain', doc);
 | 
						|
                    callback && callback();
 | 
						|
                    checkDone(stream);
 | 
						|
                }
 | 
						|
            }, error => {
 | 
						|
                return __handleError(stream, error);
 | 
						|
            });
 | 
						|
            spaceRemaining = stream.chunkSizeBytes;
 | 
						|
            stream.pos = 0;
 | 
						|
            ++stream.n;
 | 
						|
        }
 | 
						|
        inputBufRemaining -= numToCopy;
 | 
						|
        numToCopy = Math.min(spaceRemaining, inputBufRemaining);
 | 
						|
    }
 | 
						|
    // Note that we reverse the typical semantics of write's return value
 | 
						|
    // to be compatible with node's `.pipe()` function.
 | 
						|
    // False means the client should wait for the 'drain' event.
 | 
						|
    return false;
 | 
						|
}
 | 
						|
function getWriteOptions(stream) {
 | 
						|
    const obj = {};
 | 
						|
    if (stream.writeConcern) {
 | 
						|
        obj.writeConcern = {
 | 
						|
            w: stream.writeConcern.w,
 | 
						|
            wtimeout: stream.writeConcern.wtimeout,
 | 
						|
            j: stream.writeConcern.j
 | 
						|
        };
 | 
						|
    }
 | 
						|
    return obj;
 | 
						|
}
 | 
						|
function waitForIndexes(stream, callback) {
 | 
						|
    if (stream.bucket.s.checkedIndexes) {
 | 
						|
        return callback(false);
 | 
						|
    }
 | 
						|
    stream.bucket.once('index', () => {
 | 
						|
        callback(true);
 | 
						|
    });
 | 
						|
    return true;
 | 
						|
}
 | 
						|
function writeRemnant(stream, callback) {
 | 
						|
    // Buffer is empty, so don't bother to insert
 | 
						|
    if (stream.pos === 0) {
 | 
						|
        return checkDone(stream, callback);
 | 
						|
    }
 | 
						|
    ++stream.state.outstandingRequests;
 | 
						|
    // Create a new buffer to make sure the buffer isn't bigger than it needs
 | 
						|
    // to be.
 | 
						|
    const remnant = Buffer.alloc(stream.pos);
 | 
						|
    stream.bufToStore.copy(remnant, 0, 0, stream.pos);
 | 
						|
    const doc = createChunkDoc(stream.id, stream.n, remnant);
 | 
						|
    // If the stream was aborted, do not write remnant
 | 
						|
    if (checkAborted(stream, callback)) {
 | 
						|
        return false;
 | 
						|
    }
 | 
						|
    stream.chunks.insertOne(doc, getWriteOptions(stream)).then(() => {
 | 
						|
        --stream.state.outstandingRequests;
 | 
						|
        checkDone(stream);
 | 
						|
    }, error => {
 | 
						|
        return __handleError(stream, error);
 | 
						|
    });
 | 
						|
    return true;
 | 
						|
}
 | 
						|
function checkAborted(stream, callback) {
 | 
						|
    if (stream.state.aborted) {
 | 
						|
        if (typeof callback === 'function') {
 | 
						|
            // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
 | 
						|
            callback(new error_1.MongoAPIError('Stream has been aborted'));
 | 
						|
        }
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
    return false;
 | 
						|
}
 | 
						|
//# sourceMappingURL=upload.js.map
 |