You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					416 lines
				
				16 KiB
			
		
		
			
		
	
	
					416 lines
				
				16 KiB
			| 
											3 years ago
										 | "use strict"; | ||
|  | Object.defineProperty(exports, "__esModule", { value: true }); | ||
|  | exports.MonitorInterval = exports.RTTPinger = exports.Monitor = void 0; | ||
|  | const timers_1 = require("timers"); | ||
|  | const bson_1 = require("../bson"); | ||
|  | const connect_1 = require("../cmap/connect"); | ||
|  | const connection_1 = require("../cmap/connection"); | ||
|  | const constants_1 = require("../constants"); | ||
|  | const error_1 = require("../error"); | ||
|  | const mongo_types_1 = require("../mongo_types"); | ||
|  | const utils_1 = require("../utils"); | ||
|  | const common_1 = require("./common"); | ||
|  | const events_1 = require("./events"); | ||
|  | const server_1 = require("./server"); | ||
|  | /** @internal */ | ||
|  | const kServer = Symbol('server'); | ||
|  | /** @internal */ | ||
|  | const kMonitorId = Symbol('monitorId'); | ||
|  | /** @internal */ | ||
|  | const kConnection = Symbol('connection'); | ||
|  | /** @internal */ | ||
|  | const kCancellationToken = Symbol('cancellationToken'); | ||
|  | /** @internal */ | ||
|  | const kRTTPinger = Symbol('rttPinger'); | ||
|  | /** @internal */ | ||
|  | const kRoundTripTime = Symbol('roundTripTime'); | ||
|  | const STATE_IDLE = 'idle'; | ||
|  | const STATE_MONITORING = 'monitoring'; | ||
|  | const stateTransition = (0, utils_1.makeStateMachine)({ | ||
|  |     [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED], | ||
|  |     [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING], | ||
|  |     [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING], | ||
|  |     [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING] | ||
|  | }); | ||
|  | const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]); | ||
|  | function isInCloseState(monitor) { | ||
|  |     return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING; | ||
|  | } | ||
|  | /** @internal */ | ||
|  | class Monitor extends mongo_types_1.TypedEventEmitter { | ||
|  |     get connection() { | ||
|  |         return this[kConnection]; | ||
|  |     } | ||
|  |     constructor(server, options) { | ||
|  |         super(); | ||
|  |         this[kServer] = server; | ||
|  |         this[kConnection] = undefined; | ||
|  |         this[kCancellationToken] = new mongo_types_1.CancellationToken(); | ||
|  |         this[kCancellationToken].setMaxListeners(Infinity); | ||
|  |         this[kMonitorId] = undefined; | ||
|  |         this.s = { | ||
|  |             state: common_1.STATE_CLOSED | ||
|  |         }; | ||
|  |         this.address = server.description.address; | ||
|  |         this.options = Object.freeze({ | ||
|  |             connectTimeoutMS: options.connectTimeoutMS ?? 10000, | ||
|  |             heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000, | ||
|  |             minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500 | ||
|  |         }); | ||
|  |         const cancellationToken = this[kCancellationToken]; | ||
|  |         // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
 | ||
|  |         const connectOptions = Object.assign({ | ||
|  |             id: '<monitor>', | ||
|  |             generation: server.s.pool.generation, | ||
|  |             connectionType: connection_1.Connection, | ||
|  |             cancellationToken, | ||
|  |             hostAddress: server.description.hostAddress | ||
|  |         }, options,  | ||
|  |         // force BSON serialization options
 | ||
|  |         { | ||
|  |             raw: false, | ||
|  |             promoteLongs: true, | ||
|  |             promoteValues: true, | ||
|  |             promoteBuffers: true | ||
|  |         }); | ||
|  |         // ensure no authentication is used for monitoring
 | ||
|  |         delete connectOptions.credentials; | ||
|  |         if (connectOptions.autoEncrypter) { | ||
|  |             delete connectOptions.autoEncrypter; | ||
|  |         } | ||
|  |         this.connectOptions = Object.freeze(connectOptions); | ||
|  |     } | ||
|  |     connect() { | ||
|  |         if (this.s.state !== common_1.STATE_CLOSED) { | ||
|  |             return; | ||
|  |         } | ||
|  |         // start
 | ||
|  |         const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; | ||
|  |         const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; | ||
|  |         this[kMonitorId] = new MonitorInterval(monitorServer(this), { | ||
|  |             heartbeatFrequencyMS: heartbeatFrequencyMS, | ||
|  |             minHeartbeatFrequencyMS: minHeartbeatFrequencyMS, | ||
|  |             immediate: true | ||
|  |         }); | ||
|  |     } | ||
|  |     requestCheck() { | ||
|  |         if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) { | ||
|  |             return; | ||
|  |         } | ||
|  |         this[kMonitorId]?.wake(); | ||
|  |     } | ||
|  |     reset() { | ||
|  |         const topologyVersion = this[kServer].description.topologyVersion; | ||
|  |         if (isInCloseState(this) || topologyVersion == null) { | ||
|  |             return; | ||
|  |         } | ||
|  |         stateTransition(this, common_1.STATE_CLOSING); | ||
|  |         resetMonitorState(this); | ||
|  |         // restart monitor
 | ||
|  |         stateTransition(this, STATE_IDLE); | ||
|  |         // restart monitoring
 | ||
|  |         const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; | ||
|  |         const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; | ||
|  |         this[kMonitorId] = new MonitorInterval(monitorServer(this), { | ||
|  |             heartbeatFrequencyMS: heartbeatFrequencyMS, | ||
|  |             minHeartbeatFrequencyMS: minHeartbeatFrequencyMS | ||
|  |         }); | ||
|  |     } | ||
|  |     close() { | ||
|  |         if (isInCloseState(this)) { | ||
|  |             return; | ||
|  |         } | ||
|  |         stateTransition(this, common_1.STATE_CLOSING); | ||
|  |         resetMonitorState(this); | ||
|  |         // close monitor
 | ||
|  |         this.emit('close'); | ||
|  |         stateTransition(this, common_1.STATE_CLOSED); | ||
|  |     } | ||
|  | } | ||
|  | exports.Monitor = Monitor; | ||
|  | function resetMonitorState(monitor) { | ||
|  |     monitor[kMonitorId]?.stop(); | ||
|  |     monitor[kMonitorId] = undefined; | ||
|  |     monitor[kRTTPinger]?.close(); | ||
|  |     monitor[kRTTPinger] = undefined; | ||
|  |     monitor[kCancellationToken].emit('cancel'); | ||
|  |     monitor[kConnection]?.destroy({ force: true }); | ||
|  |     monitor[kConnection] = undefined; | ||
|  | } | ||
|  | function checkServer(monitor, callback) { | ||
|  |     let start = (0, utils_1.now)(); | ||
|  |     monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address)); | ||
|  |     function failureHandler(err) { | ||
|  |         monitor[kConnection]?.destroy({ force: true }); | ||
|  |         monitor[kConnection] = undefined; | ||
|  |         monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err)); | ||
|  |         const error = !(err instanceof error_1.MongoError) ? new error_1.MongoError(err) : err; | ||
|  |         error.addErrorLabel(error_1.MongoErrorLabel.ResetPool); | ||
|  |         if (error instanceof error_1.MongoNetworkTimeoutError) { | ||
|  |             error.addErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections); | ||
|  |         } | ||
|  |         monitor.emit('resetServer', error); | ||
|  |         callback(err); | ||
|  |     } | ||
|  |     const connection = monitor[kConnection]; | ||
|  |     if (connection && !connection.closed) { | ||
|  |         const { serverApi, helloOk } = connection; | ||
|  |         const connectTimeoutMS = monitor.options.connectTimeoutMS; | ||
|  |         const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; | ||
|  |         const topologyVersion = monitor[kServer].description.topologyVersion; | ||
|  |         const isAwaitable = topologyVersion != null; | ||
|  |         const cmd = { | ||
|  |             [serverApi?.version || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: 1, | ||
|  |             ...(isAwaitable && topologyVersion | ||
|  |                 ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) } | ||
|  |                 : {}) | ||
|  |         }; | ||
|  |         const options = isAwaitable | ||
|  |             ? { | ||
|  |                 socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0, | ||
|  |                 exhaustAllowed: true | ||
|  |             } | ||
|  |             : { socketTimeoutMS: connectTimeoutMS }; | ||
|  |         if (isAwaitable && monitor[kRTTPinger] == null) { | ||
|  |             monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions)); | ||
|  |         } | ||
|  |         connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => { | ||
|  |             if (err) { | ||
|  |                 return failureHandler(err); | ||
|  |             } | ||
|  |             if (!('isWritablePrimary' in hello)) { | ||
|  |                 // Provide hello-style response document.
 | ||
|  |                 hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND]; | ||
|  |             } | ||
|  |             const rttPinger = monitor[kRTTPinger]; | ||
|  |             const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start); | ||
|  |             monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello)); | ||
|  |             // if we are using the streaming protocol then we immediately issue another `started`
 | ||
|  |             // event, otherwise the "check" is complete and return to the main monitor loop
 | ||
|  |             if (isAwaitable && hello.topologyVersion) { | ||
|  |                 monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address)); | ||
|  |                 start = (0, utils_1.now)(); | ||
|  |             } | ||
|  |             else { | ||
|  |                 monitor[kRTTPinger]?.close(); | ||
|  |                 monitor[kRTTPinger] = undefined; | ||
|  |                 callback(undefined, hello); | ||
|  |             } | ||
|  |         }); | ||
|  |         return; | ||
|  |     } | ||
|  |     // connecting does an implicit `hello`
 | ||
|  |     (0, connect_1.connect)(monitor.connectOptions, (err, conn) => { | ||
|  |         if (err) { | ||
|  |             monitor[kConnection] = undefined; | ||
|  |             failureHandler(err); | ||
|  |             return; | ||
|  |         } | ||
|  |         if (conn) { | ||
|  |             // Tell the connection that we are using the streaming protocol so that the
 | ||
|  |             // connection's message stream will only read the last hello on the buffer.
 | ||
|  |             conn.isMonitoringConnection = true; | ||
|  |             if (isInCloseState(monitor)) { | ||
|  |                 conn.destroy({ force: true }); | ||
|  |                 return; | ||
|  |             } | ||
|  |             monitor[kConnection] = conn; | ||
|  |             monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello)); | ||
|  |             callback(undefined, conn.hello); | ||
|  |         } | ||
|  |     }); | ||
|  | } | ||
|  | function monitorServer(monitor) { | ||
|  |     return (callback) => { | ||
|  |         if (monitor.s.state === STATE_MONITORING) { | ||
|  |             process.nextTick(callback); | ||
|  |             return; | ||
|  |         } | ||
|  |         stateTransition(monitor, STATE_MONITORING); | ||
|  |         function done() { | ||
|  |             if (!isInCloseState(monitor)) { | ||
|  |                 stateTransition(monitor, STATE_IDLE); | ||
|  |             } | ||
|  |             callback(); | ||
|  |         } | ||
|  |         checkServer(monitor, (err, hello) => { | ||
|  |             if (err) { | ||
|  |                 // otherwise an error occurred on initial discovery, also bail
 | ||
|  |                 if (monitor[kServer].description.type === common_1.ServerType.Unknown) { | ||
|  |                     return done(); | ||
|  |                 } | ||
|  |             } | ||
|  |             // if the check indicates streaming is supported, immediately reschedule monitoring
 | ||
|  |             if (hello && hello.topologyVersion) { | ||
|  |                 (0, timers_1.setTimeout)(() => { | ||
|  |                     if (!isInCloseState(monitor)) { | ||
|  |                         monitor[kMonitorId]?.wake(); | ||
|  |                     } | ||
|  |                 }, 0); | ||
|  |             } | ||
|  |             done(); | ||
|  |         }); | ||
|  |     }; | ||
|  | } | ||
|  | function makeTopologyVersion(tv) { | ||
|  |     return { | ||
|  |         processId: tv.processId, | ||
|  |         // tests mock counter as just number, but in a real situation counter should always be a Long
 | ||
|  |         // TODO(NODE-2674): Preserve int64 sent from MongoDB
 | ||
|  |         counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter) | ||
|  |     }; | ||
|  | } | ||
|  | /** @internal */ | ||
|  | class RTTPinger { | ||
|  |     constructor(cancellationToken, options) { | ||
|  |         this[kConnection] = undefined; | ||
|  |         this[kCancellationToken] = cancellationToken; | ||
|  |         this[kRoundTripTime] = 0; | ||
|  |         this.closed = false; | ||
|  |         const heartbeatFrequencyMS = options.heartbeatFrequencyMS; | ||
|  |         this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); | ||
|  |     } | ||
|  |     get roundTripTime() { | ||
|  |         return this[kRoundTripTime]; | ||
|  |     } | ||
|  |     close() { | ||
|  |         this.closed = true; | ||
|  |         (0, timers_1.clearTimeout)(this[kMonitorId]); | ||
|  |         this[kConnection]?.destroy({ force: true }); | ||
|  |         this[kConnection] = undefined; | ||
|  |     } | ||
|  | } | ||
|  | exports.RTTPinger = RTTPinger; | ||
|  | function measureRoundTripTime(rttPinger, options) { | ||
|  |     const start = (0, utils_1.now)(); | ||
|  |     options.cancellationToken = rttPinger[kCancellationToken]; | ||
|  |     const heartbeatFrequencyMS = options.heartbeatFrequencyMS; | ||
|  |     if (rttPinger.closed) { | ||
|  |         return; | ||
|  |     } | ||
|  |     function measureAndReschedule(conn) { | ||
|  |         if (rttPinger.closed) { | ||
|  |             conn?.destroy({ force: true }); | ||
|  |             return; | ||
|  |         } | ||
|  |         if (rttPinger[kConnection] == null) { | ||
|  |             rttPinger[kConnection] = conn; | ||
|  |         } | ||
|  |         rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start); | ||
|  |         rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS); | ||
|  |     } | ||
|  |     const connection = rttPinger[kConnection]; | ||
|  |     if (connection == null) { | ||
|  |         (0, connect_1.connect)(options, (err, conn) => { | ||
|  |             if (err) { | ||
|  |                 rttPinger[kConnection] = undefined; | ||
|  |                 rttPinger[kRoundTripTime] = 0; | ||
|  |                 return; | ||
|  |             } | ||
|  |             measureAndReschedule(conn); | ||
|  |         }); | ||
|  |         return; | ||
|  |     } | ||
|  |     connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => { | ||
|  |         if (err) { | ||
|  |             rttPinger[kConnection] = undefined; | ||
|  |             rttPinger[kRoundTripTime] = 0; | ||
|  |             return; | ||
|  |         } | ||
|  |         measureAndReschedule(); | ||
|  |     }); | ||
|  | } | ||
|  | /** | ||
|  |  * @internal | ||
|  |  */ | ||
|  | class MonitorInterval { | ||
|  |     constructor(fn, options = {}) { | ||
|  |         this.isExpeditedCallToFnScheduled = false; | ||
|  |         this.stopped = false; | ||
|  |         this.isExecutionInProgress = false; | ||
|  |         this.hasExecutedOnce = false; | ||
|  |         this._executeAndReschedule = () => { | ||
|  |             if (this.stopped) | ||
|  |                 return; | ||
|  |             if (this.timerId) { | ||
|  |                 (0, timers_1.clearTimeout)(this.timerId); | ||
|  |             } | ||
|  |             this.isExpeditedCallToFnScheduled = false; | ||
|  |             this.isExecutionInProgress = true; | ||
|  |             this.fn(() => { | ||
|  |                 this.lastExecutionEnded = (0, utils_1.now)(); | ||
|  |                 this.isExecutionInProgress = false; | ||
|  |                 this._reschedule(this.heartbeatFrequencyMS); | ||
|  |             }); | ||
|  |         }; | ||
|  |         this.fn = fn; | ||
|  |         this.lastExecutionEnded = -Infinity; | ||
|  |         this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000; | ||
|  |         this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500; | ||
|  |         if (options.immediate) { | ||
|  |             this._executeAndReschedule(); | ||
|  |         } | ||
|  |         else { | ||
|  |             this._reschedule(undefined); | ||
|  |         } | ||
|  |     } | ||
|  |     wake() { | ||
|  |         const currentTime = (0, utils_1.now)(); | ||
|  |         const timeSinceLastCall = currentTime - this.lastExecutionEnded; | ||
|  |         // TODO(NODE-4674): Add error handling and logging to the monitor
 | ||
|  |         if (timeSinceLastCall < 0) { | ||
|  |             return this._executeAndReschedule(); | ||
|  |         } | ||
|  |         if (this.isExecutionInProgress) { | ||
|  |             return; | ||
|  |         } | ||
|  |         // debounce multiple calls to wake within the `minInterval`
 | ||
|  |         if (this.isExpeditedCallToFnScheduled) { | ||
|  |             return; | ||
|  |         } | ||
|  |         // reschedule a call as soon as possible, ensuring the call never happens
 | ||
|  |         // faster than the `minInterval`
 | ||
|  |         if (timeSinceLastCall < this.minHeartbeatFrequencyMS) { | ||
|  |             this.isExpeditedCallToFnScheduled = true; | ||
|  |             this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall); | ||
|  |             return; | ||
|  |         } | ||
|  |         this._executeAndReschedule(); | ||
|  |     } | ||
|  |     stop() { | ||
|  |         this.stopped = true; | ||
|  |         if (this.timerId) { | ||
|  |             (0, timers_1.clearTimeout)(this.timerId); | ||
|  |             this.timerId = undefined; | ||
|  |         } | ||
|  |         this.lastExecutionEnded = -Infinity; | ||
|  |         this.isExpeditedCallToFnScheduled = false; | ||
|  |     } | ||
|  |     toString() { | ||
|  |         return JSON.stringify(this); | ||
|  |     } | ||
|  |     toJSON() { | ||
|  |         const currentTime = (0, utils_1.now)(); | ||
|  |         const timeSinceLastCall = currentTime - this.lastExecutionEnded; | ||
|  |         return { | ||
|  |             timerId: this.timerId != null ? 'set' : 'cleared', | ||
|  |             lastCallTime: this.lastExecutionEnded, | ||
|  |             isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled, | ||
|  |             stopped: this.stopped, | ||
|  |             heartbeatFrequencyMS: this.heartbeatFrequencyMS, | ||
|  |             minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS, | ||
|  |             currentTime, | ||
|  |             timeSinceLastCall | ||
|  |         }; | ||
|  |     } | ||
|  |     _reschedule(ms) { | ||
|  |         if (this.stopped) | ||
|  |             return; | ||
|  |         if (this.timerId) { | ||
|  |             (0, timers_1.clearTimeout)(this.timerId); | ||
|  |         } | ||
|  |         this.timerId = (0, timers_1.setTimeout)(this._executeAndReschedule, ms || this.heartbeatFrequencyMS); | ||
|  |     } | ||
|  | } | ||
|  | exports.MonitorInterval = MonitorInterval; | ||
|  | //# sourceMappingURL=monitor.js.map
 |