You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							498 lines
						
					
					
						
							20 KiB
						
					
					
				
			
		
		
	
	
							498 lines
						
					
					
						
							20 KiB
						
					
					
				| "use strict";
 | |
| Object.defineProperty(exports, "__esModule", { value: true });
 | |
| exports.hasSessionSupport = exports.CryptoConnection = exports.Connection = void 0;
 | |
| const timers_1 = require("timers");
 | |
| const constants_1 = require("../constants");
 | |
| const error_1 = require("../error");
 | |
| const mongo_types_1 = require("../mongo_types");
 | |
| const sessions_1 = require("../sessions");
 | |
| const utils_1 = require("../utils");
 | |
| const command_monitoring_events_1 = require("./command_monitoring_events");
 | |
| const commands_1 = require("./commands");
 | |
| const message_stream_1 = require("./message_stream");
 | |
| const stream_description_1 = require("./stream_description");
 | |
| const shared_1 = require("./wire_protocol/shared");
 | |
| /** @internal */
 | |
| const kStream = Symbol('stream');
 | |
| /** @internal */
 | |
| const kQueue = Symbol('queue');
 | |
| /** @internal */
 | |
| const kMessageStream = Symbol('messageStream');
 | |
| /** @internal */
 | |
| const kGeneration = Symbol('generation');
 | |
| /** @internal */
 | |
| const kLastUseTime = Symbol('lastUseTime');
 | |
| /** @internal */
 | |
| const kClusterTime = Symbol('clusterTime');
 | |
| /** @internal */
 | |
| const kDescription = Symbol('description');
 | |
| /** @internal */
 | |
| const kHello = Symbol('hello');
 | |
| /** @internal */
 | |
| const kAutoEncrypter = Symbol('autoEncrypter');
 | |
| /** @internal */
 | |
| const kDelayedTimeoutId = Symbol('delayedTimeoutId');
 | |
| const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';
 | |
| /** @internal */
 | |
| class Connection extends mongo_types_1.TypedEventEmitter {
 | |
|     constructor(stream, options) {
 | |
|         super();
 | |
|         this.id = options.id;
 | |
|         this.address = streamIdentifier(stream, options);
 | |
|         this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
 | |
|         this.monitorCommands = options.monitorCommands;
 | |
|         this.serverApi = options.serverApi;
 | |
|         this.closed = false;
 | |
|         this.destroyed = false;
 | |
|         this[kHello] = null;
 | |
|         this[kClusterTime] = null;
 | |
|         this[kDescription] = new stream_description_1.StreamDescription(this.address, options);
 | |
|         this[kGeneration] = options.generation;
 | |
|         this[kLastUseTime] = (0, utils_1.now)();
 | |
|         // setup parser stream and message handling
 | |
|         this[kQueue] = new Map();
 | |
|         this[kMessageStream] = new message_stream_1.MessageStream({
 | |
|             ...options,
 | |
|             maxBsonMessageSize: this.hello?.maxBsonMessageSize
 | |
|         });
 | |
|         this[kStream] = stream;
 | |
|         this[kDelayedTimeoutId] = null;
 | |
|         this[kMessageStream].on('message', message => this.onMessage(message));
 | |
|         this[kMessageStream].on('error', error => this.onError(error));
 | |
|         this[kStream].on('close', () => this.onClose());
 | |
|         this[kStream].on('timeout', () => this.onTimeout());
 | |
|         this[kStream].on('error', () => {
 | |
|             /* ignore errors, listen to `close` instead */
 | |
|         });
 | |
|         // hook the message stream up to the passed in stream
 | |
|         this[kStream].pipe(this[kMessageStream]);
 | |
|         this[kMessageStream].pipe(this[kStream]);
 | |
|     }
 | |
|     get description() {
 | |
|         return this[kDescription];
 | |
|     }
 | |
|     get hello() {
 | |
|         return this[kHello];
 | |
|     }
 | |
|     // the `connect` method stores the result of the handshake hello on the connection
 | |
|     set hello(response) {
 | |
|         this[kDescription].receiveResponse(response);
 | |
|         this[kDescription] = Object.freeze(this[kDescription]);
 | |
|         // TODO: remove this, and only use the `StreamDescription` in the future
 | |
|         this[kHello] = response;
 | |
|     }
 | |
|     // Set the whether the message stream is for a monitoring connection.
 | |
|     set isMonitoringConnection(value) {
 | |
|         this[kMessageStream].isMonitoringConnection = value;
 | |
|     }
 | |
|     get isMonitoringConnection() {
 | |
|         return this[kMessageStream].isMonitoringConnection;
 | |
|     }
 | |
|     get serviceId() {
 | |
|         return this.hello?.serviceId;
 | |
|     }
 | |
|     get loadBalanced() {
 | |
|         return this.description.loadBalanced;
 | |
|     }
 | |
|     get generation() {
 | |
|         return this[kGeneration] || 0;
 | |
|     }
 | |
|     set generation(generation) {
 | |
|         this[kGeneration] = generation;
 | |
|     }
 | |
|     get idleTime() {
 | |
|         return (0, utils_1.calculateDurationInMs)(this[kLastUseTime]);
 | |
|     }
 | |
|     get clusterTime() {
 | |
|         return this[kClusterTime];
 | |
|     }
 | |
|     get stream() {
 | |
|         return this[kStream];
 | |
|     }
 | |
|     markAvailable() {
 | |
|         this[kLastUseTime] = (0, utils_1.now)();
 | |
|     }
 | |
|     onError(error) {
 | |
|         if (this.closed) {
 | |
|             return;
 | |
|         }
 | |
|         this[kStream].destroy(error);
 | |
|         this.closed = true;
 | |
|         for (const op of this[kQueue].values()) {
 | |
|             op.cb(error);
 | |
|         }
 | |
|         this[kQueue].clear();
 | |
|         this.emit(Connection.CLOSE);
 | |
|     }
 | |
|     onClose() {
 | |
|         if (this.closed) {
 | |
|             return;
 | |
|         }
 | |
|         this.closed = true;
 | |
|         const message = `connection ${this.id} to ${this.address} closed`;
 | |
|         for (const op of this[kQueue].values()) {
 | |
|             op.cb(new error_1.MongoNetworkError(message));
 | |
|         }
 | |
|         this[kQueue].clear();
 | |
|         this.emit(Connection.CLOSE);
 | |
|     }
 | |
|     onTimeout() {
 | |
|         if (this.closed) {
 | |
|             return;
 | |
|         }
 | |
|         this[kDelayedTimeoutId] = (0, timers_1.setTimeout)(() => {
 | |
|             this[kStream].destroy();
 | |
|             this.closed = true;
 | |
|             const message = `connection ${this.id} to ${this.address} timed out`;
 | |
|             const beforeHandshake = this.hello == null;
 | |
|             for (const op of this[kQueue].values()) {
 | |
|                 op.cb(new error_1.MongoNetworkTimeoutError(message, { beforeHandshake }));
 | |
|             }
 | |
|             this[kQueue].clear();
 | |
|             this.emit(Connection.CLOSE);
 | |
|         }, 1).unref(); // No need for this timer to hold the event loop open
 | |
|     }
 | |
|     onMessage(message) {
 | |
|         const delayedTimeoutId = this[kDelayedTimeoutId];
 | |
|         if (delayedTimeoutId != null) {
 | |
|             (0, timers_1.clearTimeout)(delayedTimeoutId);
 | |
|             this[kDelayedTimeoutId] = null;
 | |
|         }
 | |
|         // always emit the message, in case we are streaming
 | |
|         this.emit('message', message);
 | |
|         let operationDescription = this[kQueue].get(message.responseTo);
 | |
|         if (!operationDescription && this.isMonitoringConnection) {
 | |
|             // This is how we recover when the initial hello's requestId is not
 | |
|             // the responseTo when hello responses have been skipped:
 | |
|             // First check if the map is of invalid size
 | |
|             if (this[kQueue].size > 1) {
 | |
|                 this.onError(new error_1.MongoRuntimeError(INVALID_QUEUE_SIZE));
 | |
|             }
 | |
|             else {
 | |
|                 // Get the first orphaned operation description.
 | |
|                 const entry = this[kQueue].entries().next();
 | |
|                 if (entry.value != null) {
 | |
|                     const [requestId, orphaned] = entry.value;
 | |
|                     // If the orphaned operation description exists then set it.
 | |
|                     operationDescription = orphaned;
 | |
|                     // Remove the entry with the bad request id from the queue.
 | |
|                     this[kQueue].delete(requestId);
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         if (!operationDescription) {
 | |
|             return;
 | |
|         }
 | |
|         const callback = operationDescription.cb;
 | |
|         // SERVER-45775: For exhaust responses we should be able to use the same requestId to
 | |
|         // track response, however the server currently synthetically produces remote requests
 | |
|         // making the `responseTo` change on each response
 | |
|         this[kQueue].delete(message.responseTo);
 | |
|         if ('moreToCome' in message && message.moreToCome) {
 | |
|             // If the operation description check above does find an orphaned
 | |
|             // description and sets the operationDescription then this line will put one
 | |
|             // back in the queue with the correct requestId and will resolve not being able
 | |
|             // to find the next one via the responseTo of the next streaming hello.
 | |
|             this[kQueue].set(message.requestId, operationDescription);
 | |
|         }
 | |
|         else if (operationDescription.socketTimeoutOverride) {
 | |
|             this[kStream].setTimeout(this.socketTimeoutMS);
 | |
|         }
 | |
|         try {
 | |
|             // Pass in the entire description because it has BSON parsing options
 | |
|             message.parse(operationDescription);
 | |
|         }
 | |
|         catch (err) {
 | |
|             // If this error is generated by our own code, it will already have the correct class applied
 | |
|             // if it is not, then it is coming from a catastrophic data parse failure or the BSON library
 | |
|             // in either case, it should not be wrapped
 | |
|             callback(err);
 | |
|             return;
 | |
|         }
 | |
|         if (message.documents[0]) {
 | |
|             const document = message.documents[0];
 | |
|             const session = operationDescription.session;
 | |
|             if (session) {
 | |
|                 (0, sessions_1.updateSessionFromResponse)(session, document);
 | |
|             }
 | |
|             if (document.$clusterTime) {
 | |
|                 this[kClusterTime] = document.$clusterTime;
 | |
|                 this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
 | |
|             }
 | |
|             if (operationDescription.command) {
 | |
|                 if (document.writeConcernError) {
 | |
|                     callback(new error_1.MongoWriteConcernError(document.writeConcernError, document), document);
 | |
|                     return;
 | |
|                 }
 | |
|                 if (document.ok === 0 || document.$err || document.errmsg || document.code) {
 | |
|                     callback(new error_1.MongoServerError(document));
 | |
|                     return;
 | |
|                 }
 | |
|             }
 | |
|             else {
 | |
|                 // Pre 3.2 support
 | |
|                 if (document.ok === 0 || document.$err || document.errmsg) {
 | |
|                     callback(new error_1.MongoServerError(document));
 | |
|                     return;
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         callback(undefined, message.documents[0]);
 | |
|     }
 | |
|     destroy(options, callback) {
 | |
|         if (typeof options === 'function') {
 | |
|             callback = options;
 | |
|             options = { force: false };
 | |
|         }
 | |
|         this.removeAllListeners(Connection.PINNED);
 | |
|         this.removeAllListeners(Connection.UNPINNED);
 | |
|         options = Object.assign({ force: false }, options);
 | |
|         if (this[kStream] == null || this.destroyed) {
 | |
|             this.destroyed = true;
 | |
|             if (typeof callback === 'function') {
 | |
|                 callback();
 | |
|             }
 | |
|             return;
 | |
|         }
 | |
|         if (options.force) {
 | |
|             this[kStream].destroy();
 | |
|             this.destroyed = true;
 | |
|             if (typeof callback === 'function') {
 | |
|                 callback();
 | |
|             }
 | |
|             return;
 | |
|         }
 | |
|         this[kStream].end(() => {
 | |
|             this.destroyed = true;
 | |
|             if (typeof callback === 'function') {
 | |
|                 callback();
 | |
|             }
 | |
|         });
 | |
|     }
 | |
|     command(ns, cmd, options, callback) {
 | |
|         const readPreference = (0, shared_1.getReadPreference)(cmd, options);
 | |
|         const shouldUseOpMsg = supportsOpMsg(this);
 | |
|         const session = options?.session;
 | |
|         let clusterTime = this.clusterTime;
 | |
|         let finalCmd = Object.assign({}, cmd);
 | |
|         if (this.serverApi) {
 | |
|             const { version, strict, deprecationErrors } = this.serverApi;
 | |
|             finalCmd.apiVersion = version;
 | |
|             if (strict != null)
 | |
|                 finalCmd.apiStrict = strict;
 | |
|             if (deprecationErrors != null)
 | |
|                 finalCmd.apiDeprecationErrors = deprecationErrors;
 | |
|         }
 | |
|         if (hasSessionSupport(this) && session) {
 | |
|             if (session.clusterTime &&
 | |
|                 clusterTime &&
 | |
|                 session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)) {
 | |
|                 clusterTime = session.clusterTime;
 | |
|             }
 | |
|             const err = (0, sessions_1.applySession)(session, finalCmd, options);
 | |
|             if (err) {
 | |
|                 return callback(err);
 | |
|             }
 | |
|         }
 | |
|         // if we have a known cluster time, gossip it
 | |
|         if (clusterTime) {
 | |
|             finalCmd.$clusterTime = clusterTime;
 | |
|         }
 | |
|         if ((0, shared_1.isSharded)(this) && !shouldUseOpMsg && readPreference && readPreference.mode !== 'primary') {
 | |
|             finalCmd = {
 | |
|                 $query: finalCmd,
 | |
|                 $readPreference: readPreference.toJSON()
 | |
|             };
 | |
|         }
 | |
|         const commandOptions = Object.assign({
 | |
|             command: true,
 | |
|             numberToSkip: 0,
 | |
|             numberToReturn: -1,
 | |
|             checkKeys: false,
 | |
|             // This value is not overridable
 | |
|             secondaryOk: readPreference.secondaryOk()
 | |
|         }, options);
 | |
|         const cmdNs = `${ns.db}.$cmd`;
 | |
|         const message = shouldUseOpMsg
 | |
|             ? new commands_1.Msg(cmdNs, finalCmd, commandOptions)
 | |
|             : new commands_1.Query(cmdNs, finalCmd, commandOptions);
 | |
|         try {
 | |
|             write(this, message, commandOptions, callback);
 | |
|         }
 | |
|         catch (err) {
 | |
|             callback(err);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| exports.Connection = Connection;
 | |
| /** @event */
 | |
| Connection.COMMAND_STARTED = constants_1.COMMAND_STARTED;
 | |
| /** @event */
 | |
| Connection.COMMAND_SUCCEEDED = constants_1.COMMAND_SUCCEEDED;
 | |
| /** @event */
 | |
| Connection.COMMAND_FAILED = constants_1.COMMAND_FAILED;
 | |
| /** @event */
 | |
| Connection.CLUSTER_TIME_RECEIVED = constants_1.CLUSTER_TIME_RECEIVED;
 | |
| /** @event */
 | |
| Connection.CLOSE = constants_1.CLOSE;
 | |
| /** @event */
 | |
| Connection.MESSAGE = constants_1.MESSAGE;
 | |
| /** @event */
 | |
| Connection.PINNED = constants_1.PINNED;
 | |
| /** @event */
 | |
| Connection.UNPINNED = constants_1.UNPINNED;
 | |
| /** @internal */
 | |
| class CryptoConnection extends Connection {
 | |
|     constructor(stream, options) {
 | |
|         super(stream, options);
 | |
|         this[kAutoEncrypter] = options.autoEncrypter;
 | |
|     }
 | |
|     /** @internal @override */
 | |
|     command(ns, cmd, options, callback) {
 | |
|         const autoEncrypter = this[kAutoEncrypter];
 | |
|         if (!autoEncrypter) {
 | |
|             return callback(new error_1.MongoMissingDependencyError('No AutoEncrypter available for encryption'));
 | |
|         }
 | |
|         const serverWireVersion = (0, utils_1.maxWireVersion)(this);
 | |
|         if (serverWireVersion === 0) {
 | |
|             // This means the initial handshake hasn't happened yet
 | |
|             return super.command(ns, cmd, options, callback);
 | |
|         }
 | |
|         if (serverWireVersion < 8) {
 | |
|             callback(new error_1.MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2'));
 | |
|             return;
 | |
|         }
 | |
|         // Save sort or indexKeys based on the command being run
 | |
|         // the encrypt API serializes our JS objects to BSON to pass to the native code layer
 | |
|         // and then deserializes the encrypted result, the protocol level components
 | |
|         // of the command (ex. sort) are then converted to JS objects potentially losing
 | |
|         // import key order information. These fields are never encrypted so we can save the values
 | |
|         // from before the encryption and replace them after encryption has been performed
 | |
|         const sort = cmd.find || cmd.findAndModify ? cmd.sort : null;
 | |
|         const indexKeys = cmd.createIndexes
 | |
|             ? cmd.indexes.map((index) => index.key)
 | |
|             : null;
 | |
|         autoEncrypter.encrypt(ns.toString(), cmd, options, (err, encrypted) => {
 | |
|             if (err || encrypted == null) {
 | |
|                 callback(err, null);
 | |
|                 return;
 | |
|             }
 | |
|             // Replace the saved values
 | |
|             if (sort != null && (cmd.find || cmd.findAndModify)) {
 | |
|                 encrypted.sort = sort;
 | |
|             }
 | |
|             if (indexKeys != null && cmd.createIndexes) {
 | |
|                 for (const [offset, index] of indexKeys.entries()) {
 | |
|                     encrypted.indexes[offset].key = index;
 | |
|                 }
 | |
|             }
 | |
|             super.command(ns, encrypted, options, (err, response) => {
 | |
|                 if (err || response == null) {
 | |
|                     callback(err, response);
 | |
|                     return;
 | |
|                 }
 | |
|                 autoEncrypter.decrypt(response, options, callback);
 | |
|             });
 | |
|         });
 | |
|     }
 | |
| }
 | |
| exports.CryptoConnection = CryptoConnection;
 | |
| /** @internal */
 | |
| function hasSessionSupport(conn) {
 | |
|     const description = conn.description;
 | |
|     return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
 | |
| }
 | |
| exports.hasSessionSupport = hasSessionSupport;
 | |
| function supportsOpMsg(conn) {
 | |
|     const description = conn.description;
 | |
|     if (description == null) {
 | |
|         return false;
 | |
|     }
 | |
|     return (0, utils_1.maxWireVersion)(conn) >= 6 && !description.__nodejs_mock_server__;
 | |
| }
 | |
| function streamIdentifier(stream, options) {
 | |
|     if (options.proxyHost) {
 | |
|         // If proxy options are specified, the properties of `stream` itself
 | |
|         // will not accurately reflect what endpoint this is connected to.
 | |
|         return options.hostAddress.toString();
 | |
|     }
 | |
|     const { remoteAddress, remotePort } = stream;
 | |
|     if (typeof remoteAddress === 'string' && typeof remotePort === 'number') {
 | |
|         return utils_1.HostAddress.fromHostPort(remoteAddress, remotePort).toString();
 | |
|     }
 | |
|     return (0, utils_1.uuidV4)().toString('hex');
 | |
| }
 | |
| function write(conn, command, options, callback) {
 | |
|     options = options ?? {};
 | |
|     const operationDescription = {
 | |
|         requestId: command.requestId,
 | |
|         cb: callback,
 | |
|         session: options.session,
 | |
|         noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
 | |
|         documentsReturnedIn: options.documentsReturnedIn,
 | |
|         command: !!options.command,
 | |
|         // for BSON parsing
 | |
|         promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
 | |
|         promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
 | |
|         promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
 | |
|         bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false,
 | |
|         enableUtf8Validation: typeof options.enableUtf8Validation === 'boolean' ? options.enableUtf8Validation : true,
 | |
|         raw: typeof options.raw === 'boolean' ? options.raw : false,
 | |
|         started: 0
 | |
|     };
 | |
|     if (conn[kDescription] && conn[kDescription].compressor) {
 | |
|         operationDescription.agreedCompressor = conn[kDescription].compressor;
 | |
|         if (conn[kDescription].zlibCompressionLevel) {
 | |
|             operationDescription.zlibCompressionLevel = conn[kDescription].zlibCompressionLevel;
 | |
|         }
 | |
|     }
 | |
|     if (typeof options.socketTimeoutMS === 'number') {
 | |
|         operationDescription.socketTimeoutOverride = true;
 | |
|         conn[kStream].setTimeout(options.socketTimeoutMS);
 | |
|     }
 | |
|     // if command monitoring is enabled we need to modify the callback here
 | |
|     if (conn.monitorCommands) {
 | |
|         conn.emit(Connection.COMMAND_STARTED, new command_monitoring_events_1.CommandStartedEvent(conn, command));
 | |
|         operationDescription.started = (0, utils_1.now)();
 | |
|         operationDescription.cb = (err, reply) => {
 | |
|             // Command monitoring spec states that if ok is 1, then we must always emit
 | |
|             // a command suceeded event, even if there's an error. Write concern errors
 | |
|             // will have an ok: 1 in their reply.
 | |
|             if (err && reply?.ok !== 1) {
 | |
|                 conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, err, operationDescription.started));
 | |
|             }
 | |
|             else {
 | |
|                 if (reply && (reply.ok === 0 || reply.$err)) {
 | |
|                     conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, reply, operationDescription.started));
 | |
|                 }
 | |
|                 else {
 | |
|                     conn.emit(Connection.COMMAND_SUCCEEDED, new command_monitoring_events_1.CommandSucceededEvent(conn, command, reply, operationDescription.started));
 | |
|                 }
 | |
|             }
 | |
|             if (typeof callback === 'function') {
 | |
|                 // Since we're passing through the reply with the write concern error now, we
 | |
|                 // need it not to be provided to the original callback in this case so
 | |
|                 // retryability does not get tricked into thinking the command actually
 | |
|                 // succeeded.
 | |
|                 callback(err, err instanceof error_1.MongoWriteConcernError ? undefined : reply);
 | |
|             }
 | |
|         };
 | |
|     }
 | |
|     if (!operationDescription.noResponse) {
 | |
|         conn[kQueue].set(operationDescription.requestId, operationDescription);
 | |
|     }
 | |
|     try {
 | |
|         conn[kMessageStream].writeCommand(command, operationDescription);
 | |
|     }
 | |
|     catch (e) {
 | |
|         if (!operationDescription.noResponse) {
 | |
|             conn[kQueue].delete(operationDescription.requestId);
 | |
|             operationDescription.cb(e);
 | |
|             return;
 | |
|         }
 | |
|     }
 | |
|     if (operationDescription.noResponse) {
 | |
|         operationDescription.cb();
 | |
|     }
 | |
| }
 | |
| //# sourceMappingURL=connection.js.map
 |