You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					226 lines
				
				9.9 KiB
			
		
		
			
		
	
	
					226 lines
				
				9.9 KiB
			| 
											3 years ago
										 | "use strict"; | ||
|  | Object.defineProperty(exports, "__esModule", { value: true }); | ||
|  | exports.readPreferenceServerSelector = exports.secondaryWritableServerSelector = exports.sameServerSelector = exports.writableServerSelector = exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0; | ||
|  | const error_1 = require("../error"); | ||
|  | const read_preference_1 = require("../read_preference"); | ||
|  | const common_1 = require("./common"); | ||
|  | // max staleness constants
 | ||
|  | const IDLE_WRITE_PERIOD = 10000; | ||
|  | const SMALLEST_MAX_STALENESS_SECONDS = 90; | ||
|  | //  Minimum version to try writes on secondaries.
 | ||
|  | exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13; | ||
|  | /** | ||
|  |  * Returns a server selector that selects for writable servers | ||
|  |  */ | ||
|  | function writableServerSelector() { | ||
|  |     return (topologyDescription, servers) => latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable)); | ||
|  | } | ||
|  | exports.writableServerSelector = writableServerSelector; | ||
|  | /** | ||
|  |  * The purpose of this selector is to select the same server, only | ||
|  |  * if it is in a state that it can have commands sent to it. | ||
|  |  */ | ||
|  | function sameServerSelector(description) { | ||
|  |     return (topologyDescription, servers) => { | ||
|  |         if (!description) | ||
|  |             return []; | ||
|  |         // Filter the servers to match the provided description only if
 | ||
|  |         // the type is not unknown.
 | ||
|  |         return servers.filter(sd => { | ||
|  |             return sd.address === description.address && sd.type !== common_1.ServerType.Unknown; | ||
|  |         }); | ||
|  |     }; | ||
|  | } | ||
|  | exports.sameServerSelector = sameServerSelector; | ||
|  | /** | ||
|  |  * Returns a server selector that uses a read preference to select a | ||
|  |  * server potentially for a write on a secondary. | ||
|  |  */ | ||
|  | function secondaryWritableServerSelector(wireVersion, readPreference) { | ||
|  |     // If server version < 5.0, read preference always primary.
 | ||
|  |     // If server version >= 5.0...
 | ||
|  |     // - If read preference is supplied, use that.
 | ||
|  |     // - If no read preference is supplied, use primary.
 | ||
|  |     if (!readPreference || | ||
|  |         !wireVersion || | ||
|  |         (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) { | ||
|  |         return readPreferenceServerSelector(read_preference_1.ReadPreference.primary); | ||
|  |     } | ||
|  |     return readPreferenceServerSelector(readPreference); | ||
|  | } | ||
|  | exports.secondaryWritableServerSelector = secondaryWritableServerSelector; | ||
|  | /** | ||
|  |  * Reduces the passed in array of servers by the rules of the "Max Staleness" specification | ||
|  |  * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
 | ||
|  |  * | ||
|  |  * @param readPreference - The read preference providing max staleness guidance | ||
|  |  * @param topologyDescription - The topology description | ||
|  |  * @param servers - The list of server descriptions to be reduced | ||
|  |  * @returns The list of servers that satisfy the requirements of max staleness | ||
|  |  */ | ||
|  | function maxStalenessReducer(readPreference, topologyDescription, servers) { | ||
|  |     if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) { | ||
|  |         return servers; | ||
|  |     } | ||
|  |     const maxStaleness = readPreference.maxStalenessSeconds; | ||
|  |     const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000; | ||
|  |     if (maxStaleness < maxStalenessVariance) { | ||
|  |         throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`); | ||
|  |     } | ||
|  |     if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) { | ||
|  |         throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`); | ||
|  |     } | ||
|  |     if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) { | ||
|  |         const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0]; | ||
|  |         return servers.reduce((result, server) => { | ||
|  |             const stalenessMS = server.lastUpdateTime - | ||
|  |                 server.lastWriteDate - | ||
|  |                 (primary.lastUpdateTime - primary.lastWriteDate) + | ||
|  |                 topologyDescription.heartbeatFrequencyMS; | ||
|  |             const staleness = stalenessMS / 1000; | ||
|  |             const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0; | ||
|  |             if (staleness <= maxStalenessSeconds) { | ||
|  |                 result.push(server); | ||
|  |             } | ||
|  |             return result; | ||
|  |         }, []); | ||
|  |     } | ||
|  |     if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) { | ||
|  |         if (servers.length === 0) { | ||
|  |             return servers; | ||
|  |         } | ||
|  |         const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max); | ||
|  |         return servers.reduce((result, server) => { | ||
|  |             const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS; | ||
|  |             const staleness = stalenessMS / 1000; | ||
|  |             const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0; | ||
|  |             if (staleness <= maxStalenessSeconds) { | ||
|  |                 result.push(server); | ||
|  |             } | ||
|  |             return result; | ||
|  |         }, []); | ||
|  |     } | ||
|  |     return servers; | ||
|  | } | ||
|  | /** | ||
|  |  * Determines whether a server's tags match a given set of tags | ||
|  |  * | ||
|  |  * @param tagSet - The requested tag set to match | ||
|  |  * @param serverTags - The server's tags | ||
|  |  */ | ||
|  | function tagSetMatch(tagSet, serverTags) { | ||
|  |     const keys = Object.keys(tagSet); | ||
|  |     const serverTagKeys = Object.keys(serverTags); | ||
|  |     for (let i = 0; i < keys.length; ++i) { | ||
|  |         const key = keys[i]; | ||
|  |         if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) { | ||
|  |             return false; | ||
|  |         } | ||
|  |     } | ||
|  |     return true; | ||
|  | } | ||
|  | /** | ||
|  |  * Reduces a set of server descriptions based on tags requested by the read preference | ||
|  |  * | ||
|  |  * @param readPreference - The read preference providing the requested tags | ||
|  |  * @param servers - The list of server descriptions to reduce | ||
|  |  * @returns The list of servers matching the requested tags | ||
|  |  */ | ||
|  | function tagSetReducer(readPreference, servers) { | ||
|  |     if (readPreference.tags == null || | ||
|  |         (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) { | ||
|  |         return servers; | ||
|  |     } | ||
|  |     for (let i = 0; i < readPreference.tags.length; ++i) { | ||
|  |         const tagSet = readPreference.tags[i]; | ||
|  |         const serversMatchingTagset = servers.reduce((matched, server) => { | ||
|  |             if (tagSetMatch(tagSet, server.tags)) | ||
|  |                 matched.push(server); | ||
|  |             return matched; | ||
|  |         }, []); | ||
|  |         if (serversMatchingTagset.length) { | ||
|  |             return serversMatchingTagset; | ||
|  |         } | ||
|  |     } | ||
|  |     return []; | ||
|  | } | ||
|  | /** | ||
|  |  * Reduces a list of servers to ensure they fall within an acceptable latency window. This is | ||
|  |  * further specified in the "Server Selection" specification, found here: | ||
|  |  * https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
 | ||
|  |  * | ||
|  |  * @param topologyDescription - The topology description | ||
|  |  * @param servers - The list of servers to reduce | ||
|  |  * @returns The servers which fall within an acceptable latency window | ||
|  |  */ | ||
|  | function latencyWindowReducer(topologyDescription, servers) { | ||
|  |     const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1); | ||
|  |     const high = low + topologyDescription.localThresholdMS; | ||
|  |     return servers.reduce((result, server) => { | ||
|  |         if (server.roundTripTime <= high && server.roundTripTime >= low) | ||
|  |             result.push(server); | ||
|  |         return result; | ||
|  |     }, []); | ||
|  | } | ||
|  | // filters
 | ||
|  | function primaryFilter(server) { | ||
|  |     return server.type === common_1.ServerType.RSPrimary; | ||
|  | } | ||
|  | function secondaryFilter(server) { | ||
|  |     return server.type === common_1.ServerType.RSSecondary; | ||
|  | } | ||
|  | function nearestFilter(server) { | ||
|  |     return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary; | ||
|  | } | ||
|  | function knownFilter(server) { | ||
|  |     return server.type !== common_1.ServerType.Unknown; | ||
|  | } | ||
|  | function loadBalancerFilter(server) { | ||
|  |     return server.type === common_1.ServerType.LoadBalancer; | ||
|  | } | ||
|  | /** | ||
|  |  * Returns a function which selects servers based on a provided read preference | ||
|  |  * | ||
|  |  * @param readPreference - The read preference to select with | ||
|  |  */ | ||
|  | function readPreferenceServerSelector(readPreference) { | ||
|  |     if (!readPreference.isValid()) { | ||
|  |         throw new error_1.MongoInvalidArgumentError('Invalid read preference specified'); | ||
|  |     } | ||
|  |     return (topologyDescription, servers) => { | ||
|  |         const commonWireVersion = topologyDescription.commonWireVersion; | ||
|  |         if (commonWireVersion && | ||
|  |             readPreference.minWireVersion && | ||
|  |             readPreference.minWireVersion > commonWireVersion) { | ||
|  |             throw new error_1.MongoCompatibilityError(`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`); | ||
|  |         } | ||
|  |         if (topologyDescription.type === common_1.TopologyType.LoadBalanced) { | ||
|  |             return servers.filter(loadBalancerFilter); | ||
|  |         } | ||
|  |         if (topologyDescription.type === common_1.TopologyType.Unknown) { | ||
|  |             return []; | ||
|  |         } | ||
|  |         if (topologyDescription.type === common_1.TopologyType.Single || | ||
|  |             topologyDescription.type === common_1.TopologyType.Sharded) { | ||
|  |             return latencyWindowReducer(topologyDescription, servers.filter(knownFilter)); | ||
|  |         } | ||
|  |         const mode = readPreference.mode; | ||
|  |         if (mode === read_preference_1.ReadPreference.PRIMARY) { | ||
|  |             return servers.filter(primaryFilter); | ||
|  |         } | ||
|  |         if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) { | ||
|  |             const result = servers.filter(primaryFilter); | ||
|  |             if (result.length) { | ||
|  |                 return result; | ||
|  |             } | ||
|  |         } | ||
|  |         const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter; | ||
|  |         const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter)))); | ||
|  |         if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) { | ||
|  |             return servers.filter(primaryFilter); | ||
|  |         } | ||
|  |         return selectedServers; | ||
|  |     }; | ||
|  | } | ||
|  | exports.readPreferenceServerSelector = readPreferenceServerSelector; | ||
|  | //# sourceMappingURL=server_selection.js.map
 |