You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							643 lines
						
					
					
						
							28 KiB
						
					
					
				
			
		
		
	
	
							643 lines
						
					
					
						
							28 KiB
						
					
					
				| "use strict";
 | |
| Object.defineProperty(exports, "__esModule", { value: true });
 | |
| exports.ServerCapabilities = exports.Topology = void 0;
 | |
| const timers_1 = require("timers");
 | |
| const util_1 = require("util");
 | |
| const connection_string_1 = require("../connection_string");
 | |
| const constants_1 = require("../constants");
 | |
| const error_1 = require("../error");
 | |
| const mongo_types_1 = require("../mongo_types");
 | |
| const read_preference_1 = require("../read_preference");
 | |
| const utils_1 = require("../utils");
 | |
| const common_1 = require("./common");
 | |
| const events_1 = require("./events");
 | |
| const server_1 = require("./server");
 | |
| const server_description_1 = require("./server_description");
 | |
| const server_selection_1 = require("./server_selection");
 | |
| const srv_polling_1 = require("./srv_polling");
 | |
| const topology_description_1 = require("./topology_description");
 | |
| // Global state
 | |
| let globalTopologyCounter = 0;
 | |
| const stateTransition = (0, utils_1.makeStateMachine)({
 | |
|     [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
 | |
|     [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
 | |
|     [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
 | |
|     [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
 | |
| });
 | |
| /** @internal */
 | |
| const kCancelled = Symbol('cancelled');
 | |
| /** @internal */
 | |
| const kWaitQueue = Symbol('waitQueue');
 | |
| /**
 | |
|  * A container of server instances representing a connection to a MongoDB topology.
 | |
|  * @internal
 | |
|  */
 | |
| class Topology extends mongo_types_1.TypedEventEmitter {
 | |
|     /**
 | |
|      * @param seedlist - a list of HostAddress instances to connect to
 | |
|      */
 | |
|     constructor(seeds, options) {
 | |
|         super();
 | |
|         this.selectServerAsync = (0, util_1.promisify)((selector, options, callback) => this.selectServer(selector, options, callback));
 | |
|         // Options should only be undefined in tests, MongoClient will always have defined options
 | |
|         options = options ?? {
 | |
|             hosts: [utils_1.HostAddress.fromString('localhost:27017')],
 | |
|             ...Object.fromEntries(connection_string_1.DEFAULT_OPTIONS.entries()),
 | |
|             ...Object.fromEntries(connection_string_1.FEATURE_FLAGS.entries())
 | |
|         };
 | |
|         if (typeof seeds === 'string') {
 | |
|             seeds = [utils_1.HostAddress.fromString(seeds)];
 | |
|         }
 | |
|         else if (!Array.isArray(seeds)) {
 | |
|             seeds = [seeds];
 | |
|         }
 | |
|         const seedlist = [];
 | |
|         for (const seed of seeds) {
 | |
|             if (typeof seed === 'string') {
 | |
|                 seedlist.push(utils_1.HostAddress.fromString(seed));
 | |
|             }
 | |
|             else if (seed instanceof utils_1.HostAddress) {
 | |
|                 seedlist.push(seed);
 | |
|             }
 | |
|             else {
 | |
|                 // FIXME(NODE-3483): May need to be a MongoParseError
 | |
|                 throw new error_1.MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
 | |
|             }
 | |
|         }
 | |
|         const topologyType = topologyTypeFromOptions(options);
 | |
|         const topologyId = globalTopologyCounter++;
 | |
|         const selectedHosts = options.srvMaxHosts == null ||
 | |
|             options.srvMaxHosts === 0 ||
 | |
|             options.srvMaxHosts >= seedlist.length
 | |
|             ? seedlist
 | |
|             : (0, utils_1.shuffle)(seedlist, options.srvMaxHosts);
 | |
|         const serverDescriptions = new Map();
 | |
|         for (const hostAddress of selectedHosts) {
 | |
|             serverDescriptions.set(hostAddress.toString(), new server_description_1.ServerDescription(hostAddress));
 | |
|         }
 | |
|         this[kWaitQueue] = new utils_1.List();
 | |
|         this.s = {
 | |
|             // the id of this topology
 | |
|             id: topologyId,
 | |
|             // passed in options
 | |
|             options,
 | |
|             // initial seedlist of servers to connect to
 | |
|             seedlist,
 | |
|             // initial state
 | |
|             state: common_1.STATE_CLOSED,
 | |
|             // the topology description
 | |
|             description: new topology_description_1.TopologyDescription(topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options),
 | |
|             serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
 | |
|             heartbeatFrequencyMS: options.heartbeatFrequencyMS,
 | |
|             minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
 | |
|             // a map of server instances to normalized addresses
 | |
|             servers: new Map(),
 | |
|             credentials: options?.credentials,
 | |
|             clusterTime: undefined,
 | |
|             // timer management
 | |
|             connectionTimers: new Set(),
 | |
|             detectShardedTopology: ev => this.detectShardedTopology(ev),
 | |
|             detectSrvRecords: ev => this.detectSrvRecords(ev)
 | |
|         };
 | |
|         if (options.srvHost && !options.loadBalanced) {
 | |
|             this.s.srvPoller =
 | |
|                 options.srvPoller ??
 | |
|                     new srv_polling_1.SrvPoller({
 | |
|                         heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
 | |
|                         srvHost: options.srvHost,
 | |
|                         srvMaxHosts: options.srvMaxHosts,
 | |
|                         srvServiceName: options.srvServiceName
 | |
|                     });
 | |
|             this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
 | |
|         }
 | |
|     }
 | |
|     detectShardedTopology(event) {
 | |
|         const previousType = event.previousDescription.type;
 | |
|         const newType = event.newDescription.type;
 | |
|         const transitionToSharded = previousType !== common_1.TopologyType.Sharded && newType === common_1.TopologyType.Sharded;
 | |
|         const srvListeners = this.s.srvPoller?.listeners(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY);
 | |
|         const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);
 | |
|         if (transitionToSharded && !listeningToSrvPolling) {
 | |
|             this.s.srvPoller?.on(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
 | |
|             this.s.srvPoller?.start();
 | |
|         }
 | |
|     }
 | |
|     detectSrvRecords(ev) {
 | |
|         const previousTopologyDescription = this.s.description;
 | |
|         this.s.description = this.s.description.updateFromSrvPollingEvent(ev, this.s.options.srvMaxHosts);
 | |
|         if (this.s.description === previousTopologyDescription) {
 | |
|             // Nothing changed, so return
 | |
|             return;
 | |
|         }
 | |
|         updateServers(this);
 | |
|         this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
 | |
|     }
 | |
|     /**
 | |
|      * @returns A `TopologyDescription` for this topology
 | |
|      */
 | |
|     get description() {
 | |
|         return this.s.description;
 | |
|     }
 | |
|     get loadBalanced() {
 | |
|         return this.s.options.loadBalanced;
 | |
|     }
 | |
|     get capabilities() {
 | |
|         return new ServerCapabilities(this.lastHello());
 | |
|     }
 | |
|     connect(options, callback) {
 | |
|         if (typeof options === 'function')
 | |
|             (callback = options), (options = {});
 | |
|         options = options ?? {};
 | |
|         if (this.s.state === common_1.STATE_CONNECTED) {
 | |
|             if (typeof callback === 'function') {
 | |
|                 callback();
 | |
|             }
 | |
|             return;
 | |
|         }
 | |
|         stateTransition(this, common_1.STATE_CONNECTING);
 | |
|         // emit SDAM monitoring events
 | |
|         this.emit(Topology.TOPOLOGY_OPENING, new events_1.TopologyOpeningEvent(this.s.id));
 | |
|         // emit an event for the topology change
 | |
|         this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, new topology_description_1.TopologyDescription(common_1.TopologyType.Unknown), // initial is always Unknown
 | |
|         this.s.description));
 | |
|         // connect all known servers, then attempt server selection to connect
 | |
|         const serverDescriptions = Array.from(this.s.description.servers.values());
 | |
|         this.s.servers = new Map(serverDescriptions.map(serverDescription => [
 | |
|             serverDescription.address,
 | |
|             createAndConnectServer(this, serverDescription)
 | |
|         ]));
 | |
|         // In load balancer mode we need to fake a server description getting
 | |
|         // emitted from the monitor, since the monitor doesn't exist.
 | |
|         if (this.s.options.loadBalanced) {
 | |
|             for (const description of serverDescriptions) {
 | |
|                 const newDescription = new server_description_1.ServerDescription(description.hostAddress, undefined, {
 | |
|                     loadBalanced: this.s.options.loadBalanced
 | |
|                 });
 | |
|                 this.serverUpdateHandler(newDescription);
 | |
|             }
 | |
|         }
 | |
|         const exitWithError = (error) => callback ? callback(error) : this.emit(Topology.ERROR, error);
 | |
|         const readPreference = options.readPreference ?? read_preference_1.ReadPreference.primary;
 | |
|         this.selectServer((0, server_selection_1.readPreferenceServerSelector)(readPreference), options, (err, server) => {
 | |
|             if (err) {
 | |
|                 return this.close({ force: false }, () => exitWithError(err));
 | |
|             }
 | |
|             // TODO: NODE-2471
 | |
|             const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
 | |
|             if (!skipPingOnConnect && server && this.s.credentials) {
 | |
|                 server.command((0, utils_1.ns)('admin.$cmd'), { ping: 1 }, {}, err => {
 | |
|                     if (err) {
 | |
|                         return exitWithError(err);
 | |
|                     }
 | |
|                     stateTransition(this, common_1.STATE_CONNECTED);
 | |
|                     this.emit(Topology.OPEN, this);
 | |
|                     this.emit(Topology.CONNECT, this);
 | |
|                     callback?.(undefined, this);
 | |
|                 });
 | |
|                 return;
 | |
|             }
 | |
|             stateTransition(this, common_1.STATE_CONNECTED);
 | |
|             this.emit(Topology.OPEN, this);
 | |
|             this.emit(Topology.CONNECT, this);
 | |
|             callback?.(undefined, this);
 | |
|         });
 | |
|     }
 | |
|     close(options, callback) {
 | |
|         if (typeof options === 'function') {
 | |
|             callback = options;
 | |
|             options = {};
 | |
|         }
 | |
|         if (typeof options === 'boolean') {
 | |
|             options = { force: options };
 | |
|         }
 | |
|         options = options ?? {};
 | |
|         if (this.s.state === common_1.STATE_CLOSED || this.s.state === common_1.STATE_CLOSING) {
 | |
|             return callback?.();
 | |
|         }
 | |
|         const destroyedServers = Array.from(this.s.servers.values(), server => {
 | |
|             return (0, util_1.promisify)(destroyServer)(server, this, options);
 | |
|         });
 | |
|         Promise.all(destroyedServers)
 | |
|             .then(() => {
 | |
|             this.s.servers.clear();
 | |
|             stateTransition(this, common_1.STATE_CLOSING);
 | |
|             drainWaitQueue(this[kWaitQueue], new error_1.MongoTopologyClosedError());
 | |
|             (0, common_1.drainTimerQueue)(this.s.connectionTimers);
 | |
|             if (this.s.srvPoller) {
 | |
|                 this.s.srvPoller.stop();
 | |
|                 this.s.srvPoller.removeListener(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
 | |
|             }
 | |
|             this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
 | |
|             stateTransition(this, common_1.STATE_CLOSED);
 | |
|             // emit an event for close
 | |
|             this.emit(Topology.TOPOLOGY_CLOSED, new events_1.TopologyClosedEvent(this.s.id));
 | |
|         })
 | |
|             .finally(() => callback?.());
 | |
|     }
 | |
|     /**
 | |
|      * Selects a server according to the selection predicate provided
 | |
|      *
 | |
|      * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window
 | |
|      * @param options - Optional settings related to server selection
 | |
|      * @param callback - The callback used to indicate success or failure
 | |
|      * @returns An instance of a `Server` meeting the criteria of the predicate provided
 | |
|      */
 | |
|     selectServer(selector, options, callback) {
 | |
|         let serverSelector;
 | |
|         if (typeof selector !== 'function') {
 | |
|             if (typeof selector === 'string') {
 | |
|                 serverSelector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.fromString(selector));
 | |
|             }
 | |
|             else {
 | |
|                 let readPreference;
 | |
|                 if (selector instanceof read_preference_1.ReadPreference) {
 | |
|                     readPreference = selector;
 | |
|                 }
 | |
|                 else {
 | |
|                     read_preference_1.ReadPreference.translate(options);
 | |
|                     readPreference = options.readPreference || read_preference_1.ReadPreference.primary;
 | |
|                 }
 | |
|                 serverSelector = (0, server_selection_1.readPreferenceServerSelector)(readPreference);
 | |
|             }
 | |
|         }
 | |
|         else {
 | |
|             serverSelector = selector;
 | |
|         }
 | |
|         options = Object.assign({}, { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, options);
 | |
|         const isSharded = this.description.type === common_1.TopologyType.Sharded;
 | |
|         const session = options.session;
 | |
|         const transaction = session && session.transaction;
 | |
|         if (isSharded && transaction && transaction.server) {
 | |
|             callback(undefined, transaction.server);
 | |
|             return;
 | |
|         }
 | |
|         const waitQueueMember = {
 | |
|             serverSelector,
 | |
|             transaction,
 | |
|             callback
 | |
|         };
 | |
|         const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
 | |
|         if (serverSelectionTimeoutMS) {
 | |
|             waitQueueMember.timer = (0, timers_1.setTimeout)(() => {
 | |
|                 waitQueueMember[kCancelled] = true;
 | |
|                 waitQueueMember.timer = undefined;
 | |
|                 const timeoutError = new error_1.MongoServerSelectionError(`Server selection timed out after ${serverSelectionTimeoutMS} ms`, this.description);
 | |
|                 waitQueueMember.callback(timeoutError);
 | |
|             }, serverSelectionTimeoutMS);
 | |
|         }
 | |
|         this[kWaitQueue].push(waitQueueMember);
 | |
|         processWaitQueue(this);
 | |
|     }
 | |
|     // Sessions related methods
 | |
|     /**
 | |
|      * @returns Whether the topology should initiate selection to determine session support
 | |
|      */
 | |
|     shouldCheckForSessionSupport() {
 | |
|         if (this.description.type === common_1.TopologyType.Single) {
 | |
|             return !this.description.hasKnownServers;
 | |
|         }
 | |
|         return !this.description.hasDataBearingServers;
 | |
|     }
 | |
|     /**
 | |
|      * @returns Whether sessions are supported on the current topology
 | |
|      */
 | |
|     hasSessionSupport() {
 | |
|         return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null;
 | |
|     }
 | |
|     /**
 | |
|      * Update the internal TopologyDescription with a ServerDescription
 | |
|      *
 | |
|      * @param serverDescription - The server to update in the internal list of server descriptions
 | |
|      */
 | |
|     serverUpdateHandler(serverDescription) {
 | |
|         if (!this.s.description.hasServer(serverDescription.address)) {
 | |
|             return;
 | |
|         }
 | |
|         // ignore this server update if its from an outdated topologyVersion
 | |
|         if (isStaleServerDescription(this.s.description, serverDescription)) {
 | |
|             return;
 | |
|         }
 | |
|         // these will be used for monitoring events later
 | |
|         const previousTopologyDescription = this.s.description;
 | |
|         const previousServerDescription = this.s.description.servers.get(serverDescription.address);
 | |
|         if (!previousServerDescription) {
 | |
|             return;
 | |
|         }
 | |
|         // Driver Sessions Spec: "Whenever a driver receives a cluster time from
 | |
|         // a server it MUST compare it to the current highest seen cluster time
 | |
|         // for the deployment. If the new cluster time is higher than the
 | |
|         // highest seen cluster time it MUST become the new highest seen cluster
 | |
|         // time. Two cluster times are compared using only the BsonTimestamp
 | |
|         // value of the clusterTime embedded field."
 | |
|         const clusterTime = serverDescription.$clusterTime;
 | |
|         if (clusterTime) {
 | |
|             (0, common_1._advanceClusterTime)(this, clusterTime);
 | |
|         }
 | |
|         // If we already know all the information contained in this updated description, then
 | |
|         // we don't need to emit SDAM events, but still need to update the description, in order
 | |
|         // to keep client-tracked attributes like last update time and round trip time up to date
 | |
|         const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription);
 | |
|         // first update the TopologyDescription
 | |
|         this.s.description = this.s.description.update(serverDescription);
 | |
|         if (this.s.description.compatibilityError) {
 | |
|             this.emit(Topology.ERROR, new error_1.MongoCompatibilityError(this.s.description.compatibilityError));
 | |
|             return;
 | |
|         }
 | |
|         // emit monitoring events for this change
 | |
|         if (!equalDescriptions) {
 | |
|             const newDescription = this.s.description.servers.get(serverDescription.address);
 | |
|             if (newDescription) {
 | |
|                 this.emit(Topology.SERVER_DESCRIPTION_CHANGED, new events_1.ServerDescriptionChangedEvent(this.s.id, serverDescription.address, previousServerDescription, newDescription));
 | |
|             }
 | |
|         }
 | |
|         // update server list from updated descriptions
 | |
|         updateServers(this, serverDescription);
 | |
|         // attempt to resolve any outstanding server selection attempts
 | |
|         if (this[kWaitQueue].length > 0) {
 | |
|             processWaitQueue(this);
 | |
|         }
 | |
|         if (!equalDescriptions) {
 | |
|             this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
 | |
|         }
 | |
|     }
 | |
|     auth(credentials, callback) {
 | |
|         if (typeof credentials === 'function')
 | |
|             (callback = credentials), (credentials = undefined);
 | |
|         if (typeof callback === 'function')
 | |
|             callback(undefined, true);
 | |
|     }
 | |
|     get clientMetadata() {
 | |
|         return this.s.options.metadata;
 | |
|     }
 | |
|     isConnected() {
 | |
|         return this.s.state === common_1.STATE_CONNECTED;
 | |
|     }
 | |
|     isDestroyed() {
 | |
|         return this.s.state === common_1.STATE_CLOSED;
 | |
|     }
 | |
|     // NOTE: There are many places in code where we explicitly check the last hello
 | |
|     //       to do feature support detection. This should be done any other way, but for
 | |
|     //       now we will just return the first hello seen, which should suffice.
 | |
|     lastHello() {
 | |
|         const serverDescriptions = Array.from(this.description.servers.values());
 | |
|         if (serverDescriptions.length === 0)
 | |
|             return {};
 | |
|         const sd = serverDescriptions.filter((sd) => sd.type !== common_1.ServerType.Unknown)[0];
 | |
|         const result = sd || { maxWireVersion: this.description.commonWireVersion };
 | |
|         return result;
 | |
|     }
 | |
|     get commonWireVersion() {
 | |
|         return this.description.commonWireVersion;
 | |
|     }
 | |
|     get logicalSessionTimeoutMinutes() {
 | |
|         return this.description.logicalSessionTimeoutMinutes;
 | |
|     }
 | |
|     get clusterTime() {
 | |
|         return this.s.clusterTime;
 | |
|     }
 | |
|     set clusterTime(clusterTime) {
 | |
|         this.s.clusterTime = clusterTime;
 | |
|     }
 | |
| }
 | |
| exports.Topology = Topology;
 | |
| /** @event */
 | |
| Topology.SERVER_OPENING = constants_1.SERVER_OPENING;
 | |
| /** @event */
 | |
| Topology.SERVER_CLOSED = constants_1.SERVER_CLOSED;
 | |
| /** @event */
 | |
| Topology.SERVER_DESCRIPTION_CHANGED = constants_1.SERVER_DESCRIPTION_CHANGED;
 | |
| /** @event */
 | |
| Topology.TOPOLOGY_OPENING = constants_1.TOPOLOGY_OPENING;
 | |
| /** @event */
 | |
| Topology.TOPOLOGY_CLOSED = constants_1.TOPOLOGY_CLOSED;
 | |
| /** @event */
 | |
| Topology.TOPOLOGY_DESCRIPTION_CHANGED = constants_1.TOPOLOGY_DESCRIPTION_CHANGED;
 | |
| /** @event */
 | |
| Topology.ERROR = constants_1.ERROR;
 | |
| /** @event */
 | |
| Topology.OPEN = constants_1.OPEN;
 | |
| /** @event */
 | |
| Topology.CONNECT = constants_1.CONNECT;
 | |
| /** @event */
 | |
| Topology.CLOSE = constants_1.CLOSE;
 | |
| /** @event */
 | |
| Topology.TIMEOUT = constants_1.TIMEOUT;
 | |
| /** Destroys a server, and removes all event listeners from the instance */
 | |
| function destroyServer(server, topology, options, callback) {
 | |
|     options = options ?? {};
 | |
|     for (const event of constants_1.LOCAL_SERVER_EVENTS) {
 | |
|         server.removeAllListeners(event);
 | |
|     }
 | |
|     server.destroy(options, () => {
 | |
|         topology.emit(Topology.SERVER_CLOSED, new events_1.ServerClosedEvent(topology.s.id, server.description.address));
 | |
|         for (const event of constants_1.SERVER_RELAY_EVENTS) {
 | |
|             server.removeAllListeners(event);
 | |
|         }
 | |
|         if (typeof callback === 'function') {
 | |
|             callback();
 | |
|         }
 | |
|     });
 | |
| }
 | |
| /** Predicts the TopologyType from options */
 | |
| function topologyTypeFromOptions(options) {
 | |
|     if (options?.directConnection) {
 | |
|         return common_1.TopologyType.Single;
 | |
|     }
 | |
|     if (options?.replicaSet) {
 | |
|         return common_1.TopologyType.ReplicaSetNoPrimary;
 | |
|     }
 | |
|     if (options?.loadBalanced) {
 | |
|         return common_1.TopologyType.LoadBalanced;
 | |
|     }
 | |
|     return common_1.TopologyType.Unknown;
 | |
| }
 | |
| /**
 | |
|  * Creates new server instances and attempts to connect them
 | |
|  *
 | |
|  * @param topology - The topology that this server belongs to
 | |
|  * @param serverDescription - The description for the server to initialize and connect to
 | |
|  */
 | |
| function createAndConnectServer(topology, serverDescription) {
 | |
|     topology.emit(Topology.SERVER_OPENING, new events_1.ServerOpeningEvent(topology.s.id, serverDescription.address));
 | |
|     const server = new server_1.Server(topology, serverDescription, topology.s.options);
 | |
|     for (const event of constants_1.SERVER_RELAY_EVENTS) {
 | |
|         server.on(event, (e) => topology.emit(event, e));
 | |
|     }
 | |
|     server.on(server_1.Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));
 | |
|     server.connect();
 | |
|     return server;
 | |
| }
 | |
| /**
 | |
|  * @param topology - Topology to update.
 | |
|  * @param incomingServerDescription - New server description.
 | |
|  */
 | |
| function updateServers(topology, incomingServerDescription) {
 | |
|     // update the internal server's description
 | |
|     if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
 | |
|         const server = topology.s.servers.get(incomingServerDescription.address);
 | |
|         if (server) {
 | |
|             server.s.description = incomingServerDescription;
 | |
|             if (incomingServerDescription.error instanceof error_1.MongoError &&
 | |
|                 incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.ResetPool)) {
 | |
|                 const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
 | |
|                 server.s.pool.clear({ interruptInUseConnections });
 | |
|             }
 | |
|             else if (incomingServerDescription.error == null) {
 | |
|                 const newTopologyType = topology.s.description.type;
 | |
|                 const shouldMarkPoolReady = incomingServerDescription.isDataBearing ||
 | |
|                     (incomingServerDescription.type !== common_1.ServerType.Unknown &&
 | |
|                         newTopologyType === common_1.TopologyType.Single);
 | |
|                 if (shouldMarkPoolReady) {
 | |
|                     server.s.pool.ready();
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     // add new servers for all descriptions we currently don't know about locally
 | |
|     for (const serverDescription of topology.description.servers.values()) {
 | |
|         if (!topology.s.servers.has(serverDescription.address)) {
 | |
|             const server = createAndConnectServer(topology, serverDescription);
 | |
|             topology.s.servers.set(serverDescription.address, server);
 | |
|         }
 | |
|     }
 | |
|     // for all servers no longer known, remove their descriptions and destroy their instances
 | |
|     for (const entry of topology.s.servers) {
 | |
|         const serverAddress = entry[0];
 | |
|         if (topology.description.hasServer(serverAddress)) {
 | |
|             continue;
 | |
|         }
 | |
|         if (!topology.s.servers.has(serverAddress)) {
 | |
|             continue;
 | |
|         }
 | |
|         const server = topology.s.servers.get(serverAddress);
 | |
|         topology.s.servers.delete(serverAddress);
 | |
|         // prepare server for garbage collection
 | |
|         if (server) {
 | |
|             destroyServer(server, topology);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| function drainWaitQueue(queue, err) {
 | |
|     while (queue.length) {
 | |
|         const waitQueueMember = queue.shift();
 | |
|         if (!waitQueueMember) {
 | |
|             continue;
 | |
|         }
 | |
|         if (waitQueueMember.timer) {
 | |
|             (0, timers_1.clearTimeout)(waitQueueMember.timer);
 | |
|         }
 | |
|         if (!waitQueueMember[kCancelled]) {
 | |
|             waitQueueMember.callback(err);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| function processWaitQueue(topology) {
 | |
|     if (topology.s.state === common_1.STATE_CLOSED) {
 | |
|         drainWaitQueue(topology[kWaitQueue], new error_1.MongoTopologyClosedError());
 | |
|         return;
 | |
|     }
 | |
|     const isSharded = topology.description.type === common_1.TopologyType.Sharded;
 | |
|     const serverDescriptions = Array.from(topology.description.servers.values());
 | |
|     const membersToProcess = topology[kWaitQueue].length;
 | |
|     for (let i = 0; i < membersToProcess; ++i) {
 | |
|         const waitQueueMember = topology[kWaitQueue].shift();
 | |
|         if (!waitQueueMember) {
 | |
|             continue;
 | |
|         }
 | |
|         if (waitQueueMember[kCancelled]) {
 | |
|             continue;
 | |
|         }
 | |
|         let selectedDescriptions;
 | |
|         try {
 | |
|             const serverSelector = waitQueueMember.serverSelector;
 | |
|             selectedDescriptions = serverSelector
 | |
|                 ? serverSelector(topology.description, serverDescriptions)
 | |
|                 : serverDescriptions;
 | |
|         }
 | |
|         catch (e) {
 | |
|             if (waitQueueMember.timer) {
 | |
|                 (0, timers_1.clearTimeout)(waitQueueMember.timer);
 | |
|             }
 | |
|             waitQueueMember.callback(e);
 | |
|             continue;
 | |
|         }
 | |
|         let selectedServer;
 | |
|         if (selectedDescriptions.length === 0) {
 | |
|             topology[kWaitQueue].push(waitQueueMember);
 | |
|             continue;
 | |
|         }
 | |
|         else if (selectedDescriptions.length === 1) {
 | |
|             selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
 | |
|         }
 | |
|         else {
 | |
|             const descriptions = (0, utils_1.shuffle)(selectedDescriptions, 2);
 | |
|             const server1 = topology.s.servers.get(descriptions[0].address);
 | |
|             const server2 = topology.s.servers.get(descriptions[1].address);
 | |
|             selectedServer =
 | |
|                 server1 && server2 && server1.s.operationCount < server2.s.operationCount
 | |
|                     ? server1
 | |
|                     : server2;
 | |
|         }
 | |
|         if (!selectedServer) {
 | |
|             waitQueueMember.callback(new error_1.MongoServerSelectionError('server selection returned a server description but the server was not found in the topology', topology.description));
 | |
|             return;
 | |
|         }
 | |
|         const transaction = waitQueueMember.transaction;
 | |
|         if (isSharded && transaction && transaction.isActive && selectedServer) {
 | |
|             transaction.pinServer(selectedServer);
 | |
|         }
 | |
|         if (waitQueueMember.timer) {
 | |
|             (0, timers_1.clearTimeout)(waitQueueMember.timer);
 | |
|         }
 | |
|         waitQueueMember.callback(undefined, selectedServer);
 | |
|     }
 | |
|     if (topology[kWaitQueue].length > 0) {
 | |
|         // ensure all server monitors attempt monitoring soon
 | |
|         for (const [, server] of topology.s.servers) {
 | |
|             process.nextTick(function scheduleServerCheck() {
 | |
|                 return server.requestCheck();
 | |
|             });
 | |
|         }
 | |
|     }
 | |
| }
 | |
| function isStaleServerDescription(topologyDescription, incomingServerDescription) {
 | |
|     const currentServerDescription = topologyDescription.servers.get(incomingServerDescription.address);
 | |
|     const currentTopologyVersion = currentServerDescription?.topologyVersion;
 | |
|     return ((0, server_description_1.compareTopologyVersion)(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0);
 | |
| }
 | |
| /** @public */
 | |
| class ServerCapabilities {
 | |
|     constructor(hello) {
 | |
|         this.minWireVersion = hello.minWireVersion || 0;
 | |
|         this.maxWireVersion = hello.maxWireVersion || 0;
 | |
|     }
 | |
|     get hasAggregationCursor() {
 | |
|         return this.maxWireVersion >= 1;
 | |
|     }
 | |
|     get hasWriteCommands() {
 | |
|         return this.maxWireVersion >= 2;
 | |
|     }
 | |
|     get hasTextSearch() {
 | |
|         return this.minWireVersion >= 0;
 | |
|     }
 | |
|     get hasAuthCommands() {
 | |
|         return this.maxWireVersion >= 1;
 | |
|     }
 | |
|     get hasListCollectionsCommand() {
 | |
|         return this.maxWireVersion >= 3;
 | |
|     }
 | |
|     get hasListIndexesCommand() {
 | |
|         return this.maxWireVersion >= 3;
 | |
|     }
 | |
|     get supportsSnapshotReads() {
 | |
|         return this.maxWireVersion >= 13;
 | |
|     }
 | |
|     get commandsTakeWriteConcern() {
 | |
|         return this.maxWireVersion >= 5;
 | |
|     }
 | |
|     get commandsTakeCollation() {
 | |
|         return this.maxWireVersion >= 5;
 | |
|     }
 | |
| }
 | |
| exports.ServerCapabilities = ServerCapabilities;
 | |
| //# sourceMappingURL=topology.js.map
 |