You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							672 lines
						
					
					
						
							25 KiB
						
					
					
				
			
		
		
	
	
							672 lines
						
					
					
						
							25 KiB
						
					
					
				| "use strict";
 | |
| Object.defineProperty(exports, "__esModule", { value: true });
 | |
| exports.assertUninitialized = exports.next = exports.AbstractCursor = exports.CURSOR_FLAGS = void 0;
 | |
| const stream_1 = require("stream");
 | |
| const util_1 = require("util");
 | |
| const bson_1 = require("../bson");
 | |
| const error_1 = require("../error");
 | |
| const mongo_types_1 = require("../mongo_types");
 | |
| const execute_operation_1 = require("../operations/execute_operation");
 | |
| const get_more_1 = require("../operations/get_more");
 | |
| const kill_cursors_1 = require("../operations/kill_cursors");
 | |
| const read_concern_1 = require("../read_concern");
 | |
| const read_preference_1 = require("../read_preference");
 | |
| const sessions_1 = require("../sessions");
 | |
| const utils_1 = require("../utils");
 | |
| /** @internal */
 | |
| const kId = Symbol('id');
 | |
| /** @internal */
 | |
| const kDocuments = Symbol('documents');
 | |
| /** @internal */
 | |
| const kServer = Symbol('server');
 | |
| /** @internal */
 | |
| const kNamespace = Symbol('namespace');
 | |
| /** @internal */
 | |
| const kClient = Symbol('client');
 | |
| /** @internal */
 | |
| const kSession = Symbol('session');
 | |
| /** @internal */
 | |
| const kOptions = Symbol('options');
 | |
| /** @internal */
 | |
| const kTransform = Symbol('transform');
 | |
| /** @internal */
 | |
| const kInitialized = Symbol('initialized');
 | |
| /** @internal */
 | |
| const kClosed = Symbol('closed');
 | |
| /** @internal */
 | |
| const kKilled = Symbol('killed');
 | |
| /** @internal */
 | |
| const kInit = Symbol('kInit');
 | |
| /** @public */
 | |
| exports.CURSOR_FLAGS = [
 | |
|     'tailable',
 | |
|     'oplogReplay',
 | |
|     'noCursorTimeout',
 | |
|     'awaitData',
 | |
|     'exhaust',
 | |
|     'partial'
 | |
| ];
 | |
| /** @public */
 | |
| class AbstractCursor extends mongo_types_1.TypedEventEmitter {
 | |
|     /** @internal */
 | |
|     constructor(client, namespace, options = {}) {
 | |
|         super();
 | |
|         if (!client.s.isMongoClient) {
 | |
|             throw new error_1.MongoRuntimeError('Cursor must be constructed with MongoClient');
 | |
|         }
 | |
|         this[kClient] = client;
 | |
|         this[kNamespace] = namespace;
 | |
|         this[kId] = null;
 | |
|         this[kDocuments] = new utils_1.List();
 | |
|         this[kInitialized] = false;
 | |
|         this[kClosed] = false;
 | |
|         this[kKilled] = false;
 | |
|         this[kOptions] = {
 | |
|             readPreference: options.readPreference && options.readPreference instanceof read_preference_1.ReadPreference
 | |
|                 ? options.readPreference
 | |
|                 : read_preference_1.ReadPreference.primary,
 | |
|             ...(0, bson_1.pluckBSONSerializeOptions)(options)
 | |
|         };
 | |
|         const readConcern = read_concern_1.ReadConcern.fromOptions(options);
 | |
|         if (readConcern) {
 | |
|             this[kOptions].readConcern = readConcern;
 | |
|         }
 | |
|         if (typeof options.batchSize === 'number') {
 | |
|             this[kOptions].batchSize = options.batchSize;
 | |
|         }
 | |
|         // we check for undefined specifically here to allow falsy values
 | |
|         // eslint-disable-next-line no-restricted-syntax
 | |
|         if (options.comment !== undefined) {
 | |
|             this[kOptions].comment = options.comment;
 | |
|         }
 | |
|         if (typeof options.maxTimeMS === 'number') {
 | |
|             this[kOptions].maxTimeMS = options.maxTimeMS;
 | |
|         }
 | |
|         if (typeof options.maxAwaitTimeMS === 'number') {
 | |
|             this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
 | |
|         }
 | |
|         if (options.session instanceof sessions_1.ClientSession) {
 | |
|             this[kSession] = options.session;
 | |
|         }
 | |
|         else {
 | |
|             this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
 | |
|         }
 | |
|     }
 | |
|     get id() {
 | |
|         return this[kId] ?? undefined;
 | |
|     }
 | |
|     /** @internal */
 | |
|     get client() {
 | |
|         return this[kClient];
 | |
|     }
 | |
|     /** @internal */
 | |
|     get server() {
 | |
|         return this[kServer];
 | |
|     }
 | |
|     get namespace() {
 | |
|         return this[kNamespace];
 | |
|     }
 | |
|     get readPreference() {
 | |
|         return this[kOptions].readPreference;
 | |
|     }
 | |
|     get readConcern() {
 | |
|         return this[kOptions].readConcern;
 | |
|     }
 | |
|     /** @internal */
 | |
|     get session() {
 | |
|         return this[kSession];
 | |
|     }
 | |
|     set session(clientSession) {
 | |
|         this[kSession] = clientSession;
 | |
|     }
 | |
|     /** @internal */
 | |
|     get cursorOptions() {
 | |
|         return this[kOptions];
 | |
|     }
 | |
|     get closed() {
 | |
|         return this[kClosed];
 | |
|     }
 | |
|     get killed() {
 | |
|         return this[kKilled];
 | |
|     }
 | |
|     get loadBalanced() {
 | |
|         return !!this[kClient].topology?.loadBalanced;
 | |
|     }
 | |
|     /** Returns current buffered documents length */
 | |
|     bufferedCount() {
 | |
|         return this[kDocuments].length;
 | |
|     }
 | |
|     /** Returns current buffered documents */
 | |
|     readBufferedDocuments(number) {
 | |
|         const bufferedDocs = [];
 | |
|         const documentsToRead = Math.min(number ?? this[kDocuments].length, this[kDocuments].length);
 | |
|         for (let count = 0; count < documentsToRead; count++) {
 | |
|             const document = this[kDocuments].shift();
 | |
|             if (document != null) {
 | |
|                 bufferedDocs.push(document);
 | |
|             }
 | |
|         }
 | |
|         return bufferedDocs;
 | |
|     }
 | |
|     async *[Symbol.asyncIterator]() {
 | |
|         if (this.closed) {
 | |
|             return;
 | |
|         }
 | |
|         try {
 | |
|             while (true) {
 | |
|                 const document = await this.next();
 | |
|                 // Intentional strict null check, because users can map cursors to falsey values.
 | |
|                 // We allow mapping to all values except for null.
 | |
|                 // eslint-disable-next-line no-restricted-syntax
 | |
|                 if (document === null) {
 | |
|                     if (!this.closed) {
 | |
|                         const message = 'Cursor returned a `null` document, but the cursor is not exhausted.  Mapping documents to `null` is not supported in the cursor transform.';
 | |
|                         await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
 | |
|                         throw new error_1.MongoAPIError(message);
 | |
|                     }
 | |
|                     break;
 | |
|                 }
 | |
|                 yield document;
 | |
|                 if (this[kId] === bson_1.Long.ZERO) {
 | |
|                     // Cursor exhausted
 | |
|                     break;
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         finally {
 | |
|             // Only close the cursor if it has not already been closed. This finally clause handles
 | |
|             // the case when a user would break out of a for await of loop early.
 | |
|             if (!this.closed) {
 | |
|                 await this.close().catch(() => null);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     stream(options) {
 | |
|         if (options?.transform) {
 | |
|             const transform = options.transform;
 | |
|             const readable = new ReadableCursorStream(this);
 | |
|             return readable.pipe(new stream_1.Transform({
 | |
|                 objectMode: true,
 | |
|                 highWaterMark: 1,
 | |
|                 transform(chunk, _, callback) {
 | |
|                     try {
 | |
|                         const transformed = transform(chunk);
 | |
|                         callback(undefined, transformed);
 | |
|                     }
 | |
|                     catch (err) {
 | |
|                         callback(err);
 | |
|                     }
 | |
|                 }
 | |
|             }));
 | |
|         }
 | |
|         return new ReadableCursorStream(this);
 | |
|     }
 | |
|     async hasNext() {
 | |
|         if (this[kId] === bson_1.Long.ZERO) {
 | |
|             return false;
 | |
|         }
 | |
|         if (this[kDocuments].length !== 0) {
 | |
|             return true;
 | |
|         }
 | |
|         const doc = await nextAsync(this, true);
 | |
|         if (doc) {
 | |
|             this[kDocuments].unshift(doc);
 | |
|             return true;
 | |
|         }
 | |
|         return false;
 | |
|     }
 | |
|     /** Get the next available document from the cursor, returns null if no more documents are available. */
 | |
|     async next() {
 | |
|         if (this[kId] === bson_1.Long.ZERO) {
 | |
|             throw new error_1.MongoCursorExhaustedError();
 | |
|         }
 | |
|         return nextAsync(this, true);
 | |
|     }
 | |
|     /**
 | |
|      * Try to get the next available document from the cursor or `null` if an empty batch is returned
 | |
|      */
 | |
|     async tryNext() {
 | |
|         if (this[kId] === bson_1.Long.ZERO) {
 | |
|             throw new error_1.MongoCursorExhaustedError();
 | |
|         }
 | |
|         return nextAsync(this, false);
 | |
|     }
 | |
|     /**
 | |
|      * Iterates over all the documents for this cursor using the iterator, callback pattern.
 | |
|      *
 | |
|      * If the iterator returns `false`, iteration will stop.
 | |
|      *
 | |
|      * @param iterator - The iteration callback.
 | |
|      */
 | |
|     async forEach(iterator) {
 | |
|         if (typeof iterator !== 'function') {
 | |
|             throw new error_1.MongoInvalidArgumentError('Argument "iterator" must be a function');
 | |
|         }
 | |
|         for await (const document of this) {
 | |
|             const result = iterator(document);
 | |
|             if (result === false) {
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     async close() {
 | |
|         const needsToEmitClosed = !this[kClosed];
 | |
|         this[kClosed] = true;
 | |
|         await cleanupCursorAsync(this, { needsToEmitClosed });
 | |
|     }
 | |
|     /**
 | |
|      * Returns an array of documents. The caller is responsible for making sure that there
 | |
|      * is enough memory to store the results. Note that the array only contains partial
 | |
|      * results when this cursor had been previously accessed. In that case,
 | |
|      * cursor.rewind() can be used to reset the cursor.
 | |
|      */
 | |
|     async toArray() {
 | |
|         const array = [];
 | |
|         for await (const document of this) {
 | |
|             array.push(document);
 | |
|         }
 | |
|         return array;
 | |
|     }
 | |
|     /**
 | |
|      * Add a cursor flag to the cursor
 | |
|      *
 | |
|      * @param flag - The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial' -.
 | |
|      * @param value - The flag boolean value.
 | |
|      */
 | |
|     addCursorFlag(flag, value) {
 | |
|         assertUninitialized(this);
 | |
|         if (!exports.CURSOR_FLAGS.includes(flag)) {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Flag ${flag} is not one of ${exports.CURSOR_FLAGS}`);
 | |
|         }
 | |
|         if (typeof value !== 'boolean') {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Flag ${flag} must be a boolean value`);
 | |
|         }
 | |
|         this[kOptions][flag] = value;
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Map all documents using the provided function
 | |
|      * If there is a transform set on the cursor, that will be called first and the result passed to
 | |
|      * this function's transform.
 | |
|      *
 | |
|      * @remarks
 | |
|      *
 | |
|      * **Note** Cursors use `null` internally to indicate that there are no more documents in the cursor. Providing a mapping
 | |
|      * function that maps values to `null` will result in the cursor closing itself before it has finished iterating
 | |
|      * all documents.  This will **not** result in a memory leak, just surprising behavior.  For example:
 | |
|      *
 | |
|      * ```typescript
 | |
|      * const cursor = collection.find({});
 | |
|      * cursor.map(() => null);
 | |
|      *
 | |
|      * const documents = await cursor.toArray();
 | |
|      * // documents is always [], regardless of how many documents are in the collection.
 | |
|      * ```
 | |
|      *
 | |
|      * Other falsey values are allowed:
 | |
|      *
 | |
|      * ```typescript
 | |
|      * const cursor = collection.find({});
 | |
|      * cursor.map(() => '');
 | |
|      *
 | |
|      * const documents = await cursor.toArray();
 | |
|      * // documents is now an array of empty strings
 | |
|      * ```
 | |
|      *
 | |
|      * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor,
 | |
|      * it **does not** return a new instance of a cursor. This means when calling map,
 | |
|      * you should always assign the result to a new variable in order to get a correctly typed cursor variable.
 | |
|      * Take note of the following example:
 | |
|      *
 | |
|      * @example
 | |
|      * ```typescript
 | |
|      * const cursor: FindCursor<Document> = coll.find();
 | |
|      * const mappedCursor: FindCursor<number> = cursor.map(doc => Object.keys(doc).length);
 | |
|      * const keyCounts: number[] = await mappedCursor.toArray(); // cursor.toArray() still returns Document[]
 | |
|      * ```
 | |
|      * @param transform - The mapping transformation method.
 | |
|      */
 | |
|     map(transform) {
 | |
|         assertUninitialized(this);
 | |
|         const oldTransform = this[kTransform]; // TODO(NODE-3283): Improve transform typing
 | |
|         if (oldTransform) {
 | |
|             this[kTransform] = doc => {
 | |
|                 return transform(oldTransform(doc));
 | |
|             };
 | |
|         }
 | |
|         else {
 | |
|             this[kTransform] = transform;
 | |
|         }
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Set the ReadPreference for the cursor.
 | |
|      *
 | |
|      * @param readPreference - The new read preference for the cursor.
 | |
|      */
 | |
|     withReadPreference(readPreference) {
 | |
|         assertUninitialized(this);
 | |
|         if (readPreference instanceof read_preference_1.ReadPreference) {
 | |
|             this[kOptions].readPreference = readPreference;
 | |
|         }
 | |
|         else if (typeof readPreference === 'string') {
 | |
|             this[kOptions].readPreference = read_preference_1.ReadPreference.fromString(readPreference);
 | |
|         }
 | |
|         else {
 | |
|             throw new error_1.MongoInvalidArgumentError(`Invalid read preference: ${readPreference}`);
 | |
|         }
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Set the ReadPreference for the cursor.
 | |
|      *
 | |
|      * @param readPreference - The new read preference for the cursor.
 | |
|      */
 | |
|     withReadConcern(readConcern) {
 | |
|         assertUninitialized(this);
 | |
|         const resolvedReadConcern = read_concern_1.ReadConcern.fromOptions({ readConcern });
 | |
|         if (resolvedReadConcern) {
 | |
|             this[kOptions].readConcern = resolvedReadConcern;
 | |
|         }
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
 | |
|      *
 | |
|      * @param value - Number of milliseconds to wait before aborting the query.
 | |
|      */
 | |
|     maxTimeMS(value) {
 | |
|         assertUninitialized(this);
 | |
|         if (typeof value !== 'number') {
 | |
|             throw new error_1.MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
 | |
|         }
 | |
|         this[kOptions].maxTimeMS = value;
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Set the batch size for the cursor.
 | |
|      *
 | |
|      * @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}.
 | |
|      */
 | |
|     batchSize(value) {
 | |
|         assertUninitialized(this);
 | |
|         if (this[kOptions].tailable) {
 | |
|             throw new error_1.MongoTailableCursorError('Tailable cursor does not support batchSize');
 | |
|         }
 | |
|         if (typeof value !== 'number') {
 | |
|             throw new error_1.MongoInvalidArgumentError('Operation "batchSize" requires an integer');
 | |
|         }
 | |
|         this[kOptions].batchSize = value;
 | |
|         return this;
 | |
|     }
 | |
|     /**
 | |
|      * Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
 | |
|      * remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
 | |
|      * if the resultant data has already been retrieved by this cursor.
 | |
|      */
 | |
|     rewind() {
 | |
|         if (!this[kInitialized]) {
 | |
|             return;
 | |
|         }
 | |
|         this[kId] = null;
 | |
|         this[kDocuments].clear();
 | |
|         this[kClosed] = false;
 | |
|         this[kKilled] = false;
 | |
|         this[kInitialized] = false;
 | |
|         const session = this[kSession];
 | |
|         if (session) {
 | |
|             // We only want to end this session if we created it, and it hasn't ended yet
 | |
|             if (session.explicit === false) {
 | |
|                 if (!session.hasEnded) {
 | |
|                     session.endSession().catch(() => null);
 | |
|                 }
 | |
|                 this[kSession] = this.client.startSession({ owner: this, explicit: false });
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     /** @internal */
 | |
|     _getMore(batchSize, callback) {
 | |
|         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
 | |
|         const getMoreOperation = new get_more_1.GetMoreOperation(this[kNamespace], this[kId], this[kServer], {
 | |
|             ...this[kOptions],
 | |
|             session: this[kSession],
 | |
|             batchSize
 | |
|         });
 | |
|         (0, execute_operation_1.executeOperation)(this[kClient], getMoreOperation, callback);
 | |
|     }
 | |
|     /**
 | |
|      * @internal
 | |
|      *
 | |
|      * This function is exposed for the unified test runner's createChangeStream
 | |
|      * operation.  We cannot refactor to use the abstract _initialize method without
 | |
|      * a significant refactor.
 | |
|      */
 | |
|     [kInit](callback) {
 | |
|         this._initialize(this[kSession], (error, state) => {
 | |
|             if (state) {
 | |
|                 const response = state.response;
 | |
|                 this[kServer] = state.server;
 | |
|                 if (response.cursor) {
 | |
|                     // TODO(NODE-2674): Preserve int64 sent from MongoDB
 | |
|                     this[kId] =
 | |
|                         typeof response.cursor.id === 'number'
 | |
|                             ? bson_1.Long.fromNumber(response.cursor.id)
 | |
|                             : response.cursor.id;
 | |
|                     if (response.cursor.ns) {
 | |
|                         this[kNamespace] = (0, utils_1.ns)(response.cursor.ns);
 | |
|                     }
 | |
|                     this[kDocuments].pushMany(response.cursor.firstBatch);
 | |
|                 }
 | |
|                 // When server responses return without a cursor document, we close this cursor
 | |
|                 // and return the raw server response. This is often the case for explain commands
 | |
|                 // for example
 | |
|                 if (this[kId] == null) {
 | |
|                     this[kId] = bson_1.Long.ZERO;
 | |
|                     // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
 | |
|                     this[kDocuments].push(state.response);
 | |
|                 }
 | |
|             }
 | |
|             // the cursor is now initialized, even if an error occurred or it is dead
 | |
|             this[kInitialized] = true;
 | |
|             if (error) {
 | |
|                 return cleanupCursor(this, { error }, () => callback(error, undefined));
 | |
|             }
 | |
|             if (cursorIsDead(this)) {
 | |
|                 return cleanupCursor(this, undefined, () => callback());
 | |
|             }
 | |
|             callback();
 | |
|         });
 | |
|     }
 | |
| }
 | |
| exports.AbstractCursor = AbstractCursor;
 | |
| /** @event */
 | |
| AbstractCursor.CLOSE = 'close';
 | |
| function nextDocument(cursor) {
 | |
|     const doc = cursor[kDocuments].shift();
 | |
|     if (doc && cursor[kTransform]) {
 | |
|         return cursor[kTransform](doc);
 | |
|     }
 | |
|     return doc;
 | |
| }
 | |
| const nextAsync = (0, util_1.promisify)(next);
 | |
| /**
 | |
|  * @param cursor - the cursor on which to call `next`
 | |
|  * @param blocking - a boolean indicating whether or not the cursor should `block` until data
 | |
|  *     is available.  Generally, this flag is set to `false` because if the getMore returns no documents,
 | |
|  *     the cursor has been exhausted.  In certain scenarios (ChangeStreams, tailable await cursors and
 | |
|  *     `tryNext`, for example) blocking is necessary because a getMore returning no documents does
 | |
|  *     not indicate the end of the cursor.
 | |
|  * @param callback - callback to return the result to the caller
 | |
|  * @returns
 | |
|  */
 | |
| function next(cursor, blocking, callback) {
 | |
|     const cursorId = cursor[kId];
 | |
|     if (cursor.closed) {
 | |
|         return callback(undefined, null);
 | |
|     }
 | |
|     if (cursor[kDocuments].length !== 0) {
 | |
|         callback(undefined, nextDocument(cursor));
 | |
|         return;
 | |
|     }
 | |
|     if (cursorId == null) {
 | |
|         // All cursors must operate within a session, one must be made implicitly if not explicitly provided
 | |
|         cursor[kInit](err => {
 | |
|             if (err)
 | |
|                 return callback(err);
 | |
|             return next(cursor, blocking, callback);
 | |
|         });
 | |
|         return;
 | |
|     }
 | |
|     if (cursorIsDead(cursor)) {
 | |
|         return cleanupCursor(cursor, undefined, () => callback(undefined, null));
 | |
|     }
 | |
|     // otherwise need to call getMore
 | |
|     const batchSize = cursor[kOptions].batchSize || 1000;
 | |
|     cursor._getMore(batchSize, (error, response) => {
 | |
|         if (response) {
 | |
|             const cursorId = typeof response.cursor.id === 'number'
 | |
|                 ? bson_1.Long.fromNumber(response.cursor.id)
 | |
|                 : response.cursor.id;
 | |
|             cursor[kDocuments].pushMany(response.cursor.nextBatch);
 | |
|             cursor[kId] = cursorId;
 | |
|         }
 | |
|         if (error || cursorIsDead(cursor)) {
 | |
|             return cleanupCursor(cursor, { error }, () => callback(error, nextDocument(cursor)));
 | |
|         }
 | |
|         if (cursor[kDocuments].length === 0 && blocking === false) {
 | |
|             return callback(undefined, null);
 | |
|         }
 | |
|         next(cursor, blocking, callback);
 | |
|     });
 | |
| }
 | |
| exports.next = next;
 | |
| function cursorIsDead(cursor) {
 | |
|     const cursorId = cursor[kId];
 | |
|     return !!cursorId && cursorId.isZero();
 | |
| }
 | |
| const cleanupCursorAsync = (0, util_1.promisify)(cleanupCursor);
 | |
| function cleanupCursor(cursor, options, callback) {
 | |
|     const cursorId = cursor[kId];
 | |
|     const cursorNs = cursor[kNamespace];
 | |
|     const server = cursor[kServer];
 | |
|     const session = cursor[kSession];
 | |
|     const error = options?.error;
 | |
|     const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;
 | |
|     if (error) {
 | |
|         if (cursor.loadBalanced && error instanceof error_1.MongoNetworkError) {
 | |
|             return completeCleanup();
 | |
|         }
 | |
|     }
 | |
|     if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
 | |
|         if (needsToEmitClosed) {
 | |
|             cursor[kClosed] = true;
 | |
|             cursor[kId] = bson_1.Long.ZERO;
 | |
|             cursor.emit(AbstractCursor.CLOSE);
 | |
|         }
 | |
|         if (session) {
 | |
|             if (session.owner === cursor) {
 | |
|                 session.endSession({ error }).finally(() => {
 | |
|                     callback();
 | |
|                 });
 | |
|                 return;
 | |
|             }
 | |
|             if (!session.inTransaction()) {
 | |
|                 (0, sessions_1.maybeClearPinnedConnection)(session, { error });
 | |
|             }
 | |
|         }
 | |
|         return callback();
 | |
|     }
 | |
|     function completeCleanup() {
 | |
|         if (session) {
 | |
|             if (session.owner === cursor) {
 | |
|                 session.endSession({ error }).finally(() => {
 | |
|                     cursor.emit(AbstractCursor.CLOSE);
 | |
|                     callback();
 | |
|                 });
 | |
|                 return;
 | |
|             }
 | |
|             if (!session.inTransaction()) {
 | |
|                 (0, sessions_1.maybeClearPinnedConnection)(session, { error });
 | |
|             }
 | |
|         }
 | |
|         cursor.emit(AbstractCursor.CLOSE);
 | |
|         return callback();
 | |
|     }
 | |
|     cursor[kKilled] = true;
 | |
|     (0, execute_operation_1.executeOperation)(cursor[kClient], new kill_cursors_1.KillCursorsOperation(cursorId, cursorNs, server, { session })).finally(() => {
 | |
|         completeCleanup();
 | |
|     });
 | |
|     return;
 | |
| }
 | |
| /** @internal */
 | |
| function assertUninitialized(cursor) {
 | |
|     if (cursor[kInitialized]) {
 | |
|         throw new error_1.MongoCursorInUseError();
 | |
|     }
 | |
| }
 | |
| exports.assertUninitialized = assertUninitialized;
 | |
| class ReadableCursorStream extends stream_1.Readable {
 | |
|     constructor(cursor) {
 | |
|         super({
 | |
|             objectMode: true,
 | |
|             autoDestroy: false,
 | |
|             highWaterMark: 1
 | |
|         });
 | |
|         this._readInProgress = false;
 | |
|         this._cursor = cursor;
 | |
|     }
 | |
|     // eslint-disable-next-line @typescript-eslint/no-unused-vars
 | |
|     _read(size) {
 | |
|         if (!this._readInProgress) {
 | |
|             this._readInProgress = true;
 | |
|             this._readNext();
 | |
|         }
 | |
|     }
 | |
|     _destroy(error, callback) {
 | |
|         this._cursor.close().then(() => callback(error), closeError => callback(closeError));
 | |
|     }
 | |
|     _readNext() {
 | |
|         next(this._cursor, true, (err, result) => {
 | |
|             if (err) {
 | |
|                 // NOTE: This is questionable, but we have a test backing the behavior. It seems the
 | |
|                 //       desired behavior is that a stream ends cleanly when a user explicitly closes
 | |
|                 //       a client during iteration. Alternatively, we could do the "right" thing and
 | |
|                 //       propagate the error message by removing this special case.
 | |
|                 if (err.message.match(/server is closed/)) {
 | |
|                     this._cursor.close().catch(() => null);
 | |
|                     return this.push(null);
 | |
|                 }
 | |
|                 // NOTE: This is also perhaps questionable. The rationale here is that these errors tend
 | |
|                 //       to be "operation was interrupted", where a cursor has been closed but there is an
 | |
|                 //       active getMore in-flight. This used to check if the cursor was killed but once
 | |
|                 //       that changed to happen in cleanup legitimate errors would not destroy the
 | |
|                 //       stream. There are change streams test specifically test these cases.
 | |
|                 if (err.message.match(/operation was interrupted/)) {
 | |
|                     return this.push(null);
 | |
|                 }
 | |
|                 // NOTE: The two above checks on the message of the error will cause a null to be pushed
 | |
|                 //       to the stream, thus closing the stream before the destroy call happens. This means
 | |
|                 //       that either of those error messages on a change stream will not get a proper
 | |
|                 //       'error' event to be emitted (the error passed to destroy). Change stream resumability
 | |
|                 //       relies on that error event to be emitted to create its new cursor and thus was not
 | |
|                 //       working on 4.4 servers because the error emitted on failover was "interrupted at
 | |
|                 //       shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
 | |
|                 //       See NODE-4475.
 | |
|                 return this.destroy(err);
 | |
|             }
 | |
|             if (result == null) {
 | |
|                 this.push(null);
 | |
|             }
 | |
|             else if (this.destroyed) {
 | |
|                 this._cursor.close().catch(() => null);
 | |
|             }
 | |
|             else {
 | |
|                 if (this.push(result)) {
 | |
|                     return this._readNext();
 | |
|                 }
 | |
|                 this._readInProgress = false;
 | |
|             }
 | |
|         });
 | |
|     }
 | |
| }
 | |
| //# sourceMappingURL=abstract_cursor.js.map
 |