You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					367 lines
				
				15 KiB
			
		
		
			
		
	
	
					367 lines
				
				15 KiB
			| 
											3 years ago
										 | "use strict"; | ||
|  | Object.defineProperty(exports, "__esModule", { value: true }); | ||
|  | exports.Server = void 0; | ||
|  | const connection_1 = require("../cmap/connection"); | ||
|  | const connection_pool_1 = require("../cmap/connection_pool"); | ||
|  | const errors_1 = require("../cmap/errors"); | ||
|  | const constants_1 = require("../constants"); | ||
|  | const error_1 = require("../error"); | ||
|  | const mongo_types_1 = require("../mongo_types"); | ||
|  | const transactions_1 = require("../transactions"); | ||
|  | const utils_1 = require("../utils"); | ||
|  | const common_1 = require("./common"); | ||
|  | const monitor_1 = require("./monitor"); | ||
|  | const server_description_1 = require("./server_description"); | ||
|  | const stateTransition = (0, utils_1.makeStateMachine)({ | ||
|  |     [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING], | ||
|  |     [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED], | ||
|  |     [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED], | ||
|  |     [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED] | ||
|  | }); | ||
|  | /** @internal */ | ||
|  | const kMonitor = Symbol('monitor'); | ||
|  | /** @internal */ | ||
|  | class Server extends mongo_types_1.TypedEventEmitter { | ||
|  |     /** | ||
|  |      * Create a server | ||
|  |      */ | ||
|  |     constructor(topology, description, options) { | ||
|  |         super(); | ||
|  |         this.serverApi = options.serverApi; | ||
|  |         const poolOptions = { hostAddress: description.hostAddress, ...options }; | ||
|  |         this.s = { | ||
|  |             description, | ||
|  |             options, | ||
|  |             state: common_1.STATE_CLOSED, | ||
|  |             topology, | ||
|  |             pool: new connection_pool_1.ConnectionPool(this, poolOptions), | ||
|  |             operationCount: 0 | ||
|  |         }; | ||
|  |         for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) { | ||
|  |             this.s.pool.on(event, (e) => this.emit(event, e)); | ||
|  |         } | ||
|  |         this.s.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => { | ||
|  |             this.clusterTime = clusterTime; | ||
|  |         }); | ||
|  |         if (this.loadBalanced) { | ||
|  |             this[kMonitor] = null; | ||
|  |             // monitoring is disabled in load balancing mode
 | ||
|  |             return; | ||
|  |         } | ||
|  |         // create the monitor
 | ||
|  |         // TODO(NODE-4144): Remove new variable for type narrowing
 | ||
|  |         const monitor = new monitor_1.Monitor(this, this.s.options); | ||
|  |         this[kMonitor] = monitor; | ||
|  |         for (const event of constants_1.HEARTBEAT_EVENTS) { | ||
|  |             monitor.on(event, (e) => this.emit(event, e)); | ||
|  |         } | ||
|  |         monitor.on('resetServer', (error) => markServerUnknown(this, error)); | ||
|  |         monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => { | ||
|  |             this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, { | ||
|  |                 roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration) | ||
|  |             })); | ||
|  |             if (this.s.state === common_1.STATE_CONNECTING) { | ||
|  |                 stateTransition(this, common_1.STATE_CONNECTED); | ||
|  |                 this.emit(Server.CONNECT, this); | ||
|  |             } | ||
|  |         }); | ||
|  |     } | ||
|  |     get clusterTime() { | ||
|  |         return this.s.topology.clusterTime; | ||
|  |     } | ||
|  |     set clusterTime(clusterTime) { | ||
|  |         this.s.topology.clusterTime = clusterTime; | ||
|  |     } | ||
|  |     get description() { | ||
|  |         return this.s.description; | ||
|  |     } | ||
|  |     get name() { | ||
|  |         return this.s.description.address; | ||
|  |     } | ||
|  |     get autoEncrypter() { | ||
|  |         if (this.s.options && this.s.options.autoEncrypter) { | ||
|  |             return this.s.options.autoEncrypter; | ||
|  |         } | ||
|  |         return; | ||
|  |     } | ||
|  |     get loadBalanced() { | ||
|  |         return this.s.topology.description.type === common_1.TopologyType.LoadBalanced; | ||
|  |     } | ||
|  |     /** | ||
|  |      * Initiate server connect | ||
|  |      */ | ||
|  |     connect() { | ||
|  |         if (this.s.state !== common_1.STATE_CLOSED) { | ||
|  |             return; | ||
|  |         } | ||
|  |         stateTransition(this, common_1.STATE_CONNECTING); | ||
|  |         // If in load balancer mode we automatically set the server to
 | ||
|  |         // a load balancer. It never transitions out of this state and
 | ||
|  |         // has no monitor.
 | ||
|  |         if (!this.loadBalanced) { | ||
|  |             this[kMonitor]?.connect(); | ||
|  |         } | ||
|  |         else { | ||
|  |             stateTransition(this, common_1.STATE_CONNECTED); | ||
|  |             this.emit(Server.CONNECT, this); | ||
|  |         } | ||
|  |     } | ||
|  |     /** Destroy the server connection */ | ||
|  |     destroy(options, callback) { | ||
|  |         if (typeof options === 'function') | ||
|  |             (callback = options), (options = {}); | ||
|  |         options = Object.assign({}, { force: false }, options); | ||
|  |         if (this.s.state === common_1.STATE_CLOSED) { | ||
|  |             if (typeof callback === 'function') { | ||
|  |                 callback(); | ||
|  |             } | ||
|  |             return; | ||
|  |         } | ||
|  |         stateTransition(this, common_1.STATE_CLOSING); | ||
|  |         if (!this.loadBalanced) { | ||
|  |             this[kMonitor]?.close(); | ||
|  |         } | ||
|  |         this.s.pool.close(options, err => { | ||
|  |             stateTransition(this, common_1.STATE_CLOSED); | ||
|  |             this.emit('closed'); | ||
|  |             if (typeof callback === 'function') { | ||
|  |                 callback(err); | ||
|  |             } | ||
|  |         }); | ||
|  |     } | ||
|  |     /** | ||
|  |      * Immediately schedule monitoring of this server. If there already an attempt being made | ||
|  |      * this will be a no-op. | ||
|  |      */ | ||
|  |     requestCheck() { | ||
|  |         if (!this.loadBalanced) { | ||
|  |             this[kMonitor]?.requestCheck(); | ||
|  |         } | ||
|  |     } | ||
|  |     /** | ||
|  |      * Execute a command | ||
|  |      * @internal | ||
|  |      */ | ||
|  |     command(ns, cmd, options, callback) { | ||
|  |         if (callback == null) { | ||
|  |             throw new error_1.MongoInvalidArgumentError('Callback must be provided'); | ||
|  |         } | ||
|  |         if (ns.db == null || typeof ns === 'string') { | ||
|  |             throw new error_1.MongoInvalidArgumentError('Namespace must not be a string'); | ||
|  |         } | ||
|  |         if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) { | ||
|  |             callback(new error_1.MongoServerClosedError()); | ||
|  |             return; | ||
|  |         } | ||
|  |         // Clone the options
 | ||
|  |         const finalOptions = Object.assign({}, options, { wireProtocolCommand: false }); | ||
|  |         // There are cases where we need to flag the read preference not to get sent in
 | ||
|  |         // the command, such as pre-5.0 servers attempting to perform an aggregate write
 | ||
|  |         // with a non-primary read preference. In this case the effective read preference
 | ||
|  |         // (primary) is not the same as the provided and must be removed completely.
 | ||
|  |         if (finalOptions.omitReadPreference) { | ||
|  |             delete finalOptions.readPreference; | ||
|  |         } | ||
|  |         const session = finalOptions.session; | ||
|  |         const conn = session?.pinnedConnection; | ||
|  |         // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
 | ||
|  |         //       (and prevent them from being checked back in) at the point of operation execution.
 | ||
|  |         //       This should be considered as part of the work for NODE-2882
 | ||
|  |         // NOTE:
 | ||
|  |         //       When incrementing operation count, it's important that we increment it before we
 | ||
|  |         //       attempt to check out a connection from the pool.  This ensures that operations that
 | ||
|  |         //       are waiting for a connection are included in the operation count.  Load balanced
 | ||
|  |         //       mode will only ever have a single server, so the operation count doesn't matter.
 | ||
|  |         //       Incrementing the operation count above the logic to handle load balanced mode would
 | ||
|  |         //       require special logic to decrement it again, or would double increment (the load
 | ||
|  |         //       balanced code makes a recursive call).  Instead, we increment the count after this
 | ||
|  |         //       check.
 | ||
|  |         if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { | ||
|  |             this.s.pool.checkOut((err, checkedOut) => { | ||
|  |                 if (err || checkedOut == null) { | ||
|  |                     if (callback) | ||
|  |                         return callback(err); | ||
|  |                     return; | ||
|  |                 } | ||
|  |                 session.pin(checkedOut); | ||
|  |                 this.command(ns, cmd, finalOptions, callback); | ||
|  |             }); | ||
|  |             return; | ||
|  |         } | ||
|  |         this.s.operationCount += 1; | ||
|  |         this.s.pool.withConnection(conn, (err, conn, cb) => { | ||
|  |             if (err || !conn) { | ||
|  |                 this.s.operationCount -= 1; | ||
|  |                 if (!err) { | ||
|  |                     return cb(new error_1.MongoRuntimeError('Failed to create connection without error')); | ||
|  |                 } | ||
|  |                 if (!(err instanceof errors_1.PoolClearedError)) { | ||
|  |                     this.handleError(err); | ||
|  |                 } | ||
|  |                 return cb(err); | ||
|  |             } | ||
|  |             conn.command(ns, cmd, finalOptions, makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { | ||
|  |                 this.s.operationCount -= 1; | ||
|  |                 cb(error, response); | ||
|  |             })); | ||
|  |         }, callback); | ||
|  |     } | ||
|  |     /** | ||
|  |      * Handle SDAM error | ||
|  |      * @internal | ||
|  |      */ | ||
|  |     handleError(error, connection) { | ||
|  |         if (!(error instanceof error_1.MongoError)) { | ||
|  |             return; | ||
|  |         } | ||
|  |         const isStaleError = error.connectionGeneration && error.connectionGeneration < this.s.pool.generation; | ||
|  |         if (isStaleError) { | ||
|  |             return; | ||
|  |         } | ||
|  |         const isNetworkNonTimeoutError = error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError); | ||
|  |         const isNetworkTimeoutBeforeHandshakeError = (0, error_1.isNetworkErrorBeforeHandshake)(error); | ||
|  |         const isAuthHandshakeError = error.hasErrorLabel(error_1.MongoErrorLabel.HandshakeError); | ||
|  |         if (isNetworkNonTimeoutError || isNetworkTimeoutBeforeHandshakeError || isAuthHandshakeError) { | ||
|  |             // In load balanced mode we never mark the server as unknown and always
 | ||
|  |             // clear for the specific service id.
 | ||
|  |             if (!this.loadBalanced) { | ||
|  |                 error.addErrorLabel(error_1.MongoErrorLabel.ResetPool); | ||
|  |                 markServerUnknown(this, error); | ||
|  |             } | ||
|  |             else if (connection) { | ||
|  |                 this.s.pool.clear({ serviceId: connection.serviceId }); | ||
|  |             } | ||
|  |         } | ||
|  |         else { | ||
|  |             if ((0, error_1.isSDAMUnrecoverableError)(error)) { | ||
|  |                 if (shouldHandleStateChangeError(this, error)) { | ||
|  |                     const shouldClearPool = (0, utils_1.maxWireVersion)(this) <= 7 || (0, error_1.isNodeShuttingDownError)(error); | ||
|  |                     if (this.loadBalanced && connection && shouldClearPool) { | ||
|  |                         this.s.pool.clear({ serviceId: connection.serviceId }); | ||
|  |                     } | ||
|  |                     if (!this.loadBalanced) { | ||
|  |                         if (shouldClearPool) { | ||
|  |                             error.addErrorLabel(error_1.MongoErrorLabel.ResetPool); | ||
|  |                         } | ||
|  |                         markServerUnknown(this, error); | ||
|  |                         process.nextTick(() => this.requestCheck()); | ||
|  |                     } | ||
|  |                 } | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  | } | ||
|  | exports.Server = Server; | ||
|  | /** @event */ | ||
|  | Server.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED; | ||
|  | /** @event */ | ||
|  | Server.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED; | ||
|  | /** @event */ | ||
|  | Server.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED; | ||
|  | /** @event */ | ||
|  | Server.CONNECT = constants_1.CONNECT; | ||
|  | /** @event */ | ||
|  | Server.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED; | ||
|  | /** @event */ | ||
|  | Server.CLOSED = constants_1.CLOSED; | ||
|  | /** @event */ | ||
|  | Server.ENDED = constants_1.ENDED; | ||
|  | function calculateRoundTripTime(oldRtt, duration) { | ||
|  |     if (oldRtt === -1) { | ||
|  |         return duration; | ||
|  |     } | ||
|  |     const alpha = 0.2; | ||
|  |     return alpha * duration + (1 - alpha) * oldRtt; | ||
|  | } | ||
|  | function markServerUnknown(server, error) { | ||
|  |     // Load balancer servers can never be marked unknown.
 | ||
|  |     if (server.loadBalanced) { | ||
|  |         return; | ||
|  |     } | ||
|  |     if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) { | ||
|  |         server[kMonitor]?.reset(); | ||
|  |     } | ||
|  |     server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, { error })); | ||
|  | } | ||
|  | function isPinnableCommand(cmd, session) { | ||
|  |     if (session) { | ||
|  |         return (session.inTransaction() || | ||
|  |             'aggregate' in cmd || | ||
|  |             'find' in cmd || | ||
|  |             'getMore' in cmd || | ||
|  |             'listCollections' in cmd || | ||
|  |             'listIndexes' in cmd); | ||
|  |     } | ||
|  |     return false; | ||
|  | } | ||
|  | function connectionIsStale(pool, connection) { | ||
|  |     if (connection.serviceId) { | ||
|  |         return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString())); | ||
|  |     } | ||
|  |     return connection.generation !== pool.generation; | ||
|  | } | ||
|  | function shouldHandleStateChangeError(server, err) { | ||
|  |     const etv = err.topologyVersion; | ||
|  |     const stv = server.description.topologyVersion; | ||
|  |     return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0; | ||
|  | } | ||
|  | function inActiveTransaction(session, cmd) { | ||
|  |     return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd); | ||
|  | } | ||
|  | /** this checks the retryWrites option passed down from the client options, it | ||
|  |  * does not check if the server supports retryable writes */ | ||
|  | function isRetryableWritesEnabled(topology) { | ||
|  |     return topology.s.options.retryWrites !== false; | ||
|  | } | ||
|  | function makeOperationHandler(server, connection, cmd, options, callback) { | ||
|  |     const session = options?.session; | ||
|  |     return function handleOperationResult(error, result) { | ||
|  |         // We should not swallow an error if it is present.
 | ||
|  |         if (error == null && result != null) { | ||
|  |             return callback(undefined, result); | ||
|  |         } | ||
|  |         if (options != null && 'noResponse' in options && options.noResponse === true) { | ||
|  |             return callback(undefined, null); | ||
|  |         } | ||
|  |         if (!error) { | ||
|  |             return callback(new error_1.MongoUnexpectedServerResponseError('Empty response with no error')); | ||
|  |         } | ||
|  |         if (!(error instanceof error_1.MongoError)) { | ||
|  |             // Node.js or some other error we have not special handling for
 | ||
|  |             return callback(error); | ||
|  |         } | ||
|  |         if (connectionIsStale(server.s.pool, connection)) { | ||
|  |             return callback(error); | ||
|  |         } | ||
|  |         if (error instanceof error_1.MongoNetworkError) { | ||
|  |             if (session && !session.hasEnded && session.serverSession) { | ||
|  |                 session.serverSession.isDirty = true; | ||
|  |             } | ||
|  |             // inActiveTransaction check handles commit and abort.
 | ||
|  |             if (inActiveTransaction(session, cmd) && | ||
|  |                 !error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) { | ||
|  |                 error.addErrorLabel(error_1.MongoErrorLabel.TransientTransactionError); | ||
|  |             } | ||
|  |             if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) && | ||
|  |                 (0, utils_1.supportsRetryableWrites)(server) && | ||
|  |                 !inActiveTransaction(session, cmd)) { | ||
|  |                 error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError); | ||
|  |             } | ||
|  |         } | ||
|  |         else { | ||
|  |             if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) && | ||
|  |                 (0, error_1.needsRetryableWriteLabel)(error, (0, utils_1.maxWireVersion)(server)) && | ||
|  |                 !inActiveTransaction(session, cmd)) { | ||
|  |                 error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError); | ||
|  |             } | ||
|  |         } | ||
|  |         if (session && | ||
|  |             session.isPinned && | ||
|  |             error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) { | ||
|  |             session.unpin({ force: true }); | ||
|  |         } | ||
|  |         server.handleError(error, connection); | ||
|  |         return callback(error); | ||
|  |     }; | ||
|  | } | ||
|  | //# sourceMappingURL=server.js.map
 |