You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					417 lines
				
				12 KiB
			
		
		
			
		
	
	
					417 lines
				
				12 KiB
			| 
											3 years ago
										 | 'use strict'; | ||
|  | 
 | ||
|  | const EventEmitter = require('events'); | ||
|  | const MessageStream = require('./message_stream'); | ||
|  | const MongoError = require('../core/error').MongoError; | ||
|  | const MongoNetworkError = require('../core/error').MongoNetworkError; | ||
|  | const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError; | ||
|  | const MongoWriteConcernError = require('../core/error').MongoWriteConcernError; | ||
|  | const CommandResult = require('../core/connection/command_result'); | ||
|  | const StreamDescription = require('./stream_description').StreamDescription; | ||
|  | const wp = require('../core/wireprotocol'); | ||
|  | const apm = require('../core/connection/apm'); | ||
|  | const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse; | ||
|  | const uuidV4 = require('../core/utils').uuidV4; | ||
|  | const now = require('../utils').now; | ||
|  | const calculateDurationInMs = require('../utils').calculateDurationInMs; | ||
|  | 
 | ||
|  | const kStream = Symbol('stream'); | ||
|  | const kQueue = Symbol('queue'); | ||
|  | const kMessageStream = Symbol('messageStream'); | ||
|  | const kGeneration = Symbol('generation'); | ||
|  | const kLastUseTime = Symbol('lastUseTime'); | ||
|  | const kClusterTime = Symbol('clusterTime'); | ||
|  | const kDescription = Symbol('description'); | ||
|  | const kIsMaster = Symbol('ismaster'); | ||
|  | const kAutoEncrypter = Symbol('autoEncrypter'); | ||
|  | 
 | ||
|  | class Connection extends EventEmitter { | ||
|  |   constructor(stream, options) { | ||
|  |     super(options); | ||
|  | 
 | ||
|  |     this.id = options.id; | ||
|  |     this.address = streamIdentifier(stream); | ||
|  |     this.bson = options.bson; | ||
|  |     this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0; | ||
|  |     this.host = options.host || 'localhost'; | ||
|  |     this.port = options.port || 27017; | ||
|  |     this.monitorCommands = | ||
|  |       typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false; | ||
|  |     this.serverApi = options.serverApi; | ||
|  | 
 | ||
|  |     this.closed = false; | ||
|  |     this.destroyed = false; | ||
|  | 
 | ||
|  |     this[kDescription] = new StreamDescription(this.address, options); | ||
|  |     this[kGeneration] = options.generation; | ||
|  |     this[kLastUseTime] = now(); | ||
|  | 
 | ||
|  |     // retain a reference to an `AutoEncrypter` if present
 | ||
|  |     if (options.autoEncrypter) { | ||
|  |       this[kAutoEncrypter] = options.autoEncrypter; | ||
|  |     } | ||
|  | 
 | ||
|  |     // setup parser stream and message handling
 | ||
|  |     this[kQueue] = new Map(); | ||
|  |     this[kMessageStream] = new MessageStream(options); | ||
|  |     this[kMessageStream].on('message', messageHandler(this)); | ||
|  |     this[kStream] = stream; | ||
|  |     stream.on('error', () => { | ||
|  |       /* ignore errors, listen to `close` instead */ | ||
|  |     }); | ||
|  | 
 | ||
|  |     this[kMessageStream].on('error', error => this.handleIssue({ destroy: error })); | ||
|  |     stream.on('close', () => this.handleIssue({ isClose: true })); | ||
|  |     stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true })); | ||
|  | 
 | ||
|  |     // hook the message stream up to the passed in stream
 | ||
|  |     stream.pipe(this[kMessageStream]); | ||
|  |     this[kMessageStream].pipe(stream); | ||
|  |   } | ||
|  | 
 | ||
|  |   get description() { | ||
|  |     return this[kDescription]; | ||
|  |   } | ||
|  | 
 | ||
|  |   get ismaster() { | ||
|  |     return this[kIsMaster]; | ||
|  |   } | ||
|  | 
 | ||
|  |   // the `connect` method stores the result of the handshake ismaster on the connection
 | ||
|  |   set ismaster(response) { | ||
|  |     this[kDescription].receiveResponse(response); | ||
|  | 
 | ||
|  |     // TODO: remove this, and only use the `StreamDescription` in the future
 | ||
|  |     this[kIsMaster] = response; | ||
|  |   } | ||
|  | 
 | ||
|  |   get generation() { | ||
|  |     return this[kGeneration] || 0; | ||
|  |   } | ||
|  | 
 | ||
|  |   get idleTime() { | ||
|  |     return calculateDurationInMs(this[kLastUseTime]); | ||
|  |   } | ||
|  | 
 | ||
|  |   get clusterTime() { | ||
|  |     return this[kClusterTime]; | ||
|  |   } | ||
|  | 
 | ||
|  |   get stream() { | ||
|  |     return this[kStream]; | ||
|  |   } | ||
|  | 
 | ||
|  |   markAvailable() { | ||
|  |     this[kLastUseTime] = now(); | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue | ||
|  |    */ | ||
|  |   handleIssue(issue) { | ||
|  |     if (this.closed) { | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (issue.destroy) { | ||
|  |       this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); | ||
|  |     } | ||
|  | 
 | ||
|  |     this.closed = true; | ||
|  | 
 | ||
|  |     for (const idAndOp of this[kQueue]) { | ||
|  |       const op = idAndOp[1]; | ||
|  |       if (issue.isTimeout) { | ||
|  |         op.cb( | ||
|  |           new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { | ||
|  |             beforeHandshake: this.ismaster == null | ||
|  |           }) | ||
|  |         ); | ||
|  |       } else if (issue.isClose) { | ||
|  |         op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); | ||
|  |       } else { | ||
|  |         op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kQueue].clear(); | ||
|  |     this.emit('close'); | ||
|  |   } | ||
|  | 
 | ||
|  |   destroy(options, callback) { | ||
|  |     if (typeof options === 'function') { | ||
|  |       callback = options; | ||
|  |       options = {}; | ||
|  |     } | ||
|  | 
 | ||
|  |     options = Object.assign({ force: false }, options); | ||
|  |     if (this[kStream] == null || this.destroyed) { | ||
|  |       this.destroyed = true; | ||
|  |       if (typeof callback === 'function') { | ||
|  |         callback(); | ||
|  |       } | ||
|  | 
 | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (options.force) { | ||
|  |       this[kStream].destroy(); | ||
|  |       this.destroyed = true; | ||
|  |       if (typeof callback === 'function') { | ||
|  |         callback(); | ||
|  |       } | ||
|  | 
 | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     this[kStream].end(err => { | ||
|  |       this.destroyed = true; | ||
|  |       if (typeof callback === 'function') { | ||
|  |         callback(err); | ||
|  |       } | ||
|  |     }); | ||
|  |   } | ||
|  | 
 | ||
|  |   applyApiVersion(options) { | ||
|  |     if (this.serverApi) { | ||
|  |       options.serverApi = this.serverApi; | ||
|  |     } | ||
|  |     return options; | ||
|  |   } | ||
|  | 
 | ||
|  |   // Wire protocol methods
 | ||
|  |   command(ns, cmd, options, callback) { | ||
|  |     if (typeof options === 'function') { | ||
|  |       callback = options; | ||
|  |       options = {}; | ||
|  |     } | ||
|  |     wp.command(makeServerTrampoline(this), ns, cmd, this.applyApiVersion(options), callback); | ||
|  |   } | ||
|  | 
 | ||
|  |   query(ns, cmd, cursorState, options, callback) { | ||
|  |     wp.query( | ||
|  |       makeServerTrampoline(this), | ||
|  |       ns, | ||
|  |       cmd, | ||
|  |       cursorState, | ||
|  |       this.applyApiVersion(options), | ||
|  |       callback | ||
|  |     ); | ||
|  |   } | ||
|  | 
 | ||
|  |   getMore(ns, cursorState, batchSize, options, callback) { | ||
|  |     wp.getMore( | ||
|  |       makeServerTrampoline(this), | ||
|  |       ns, | ||
|  |       cursorState, | ||
|  |       batchSize, | ||
|  |       this.applyApiVersion(options), | ||
|  |       callback | ||
|  |     ); | ||
|  |   } | ||
|  | 
 | ||
|  |   killCursors(ns, cursorState, callback) { | ||
|  |     wp.killCursors(makeServerTrampoline(this), ns, cursorState, this.applyApiVersion({}), callback); | ||
|  |   } | ||
|  | 
 | ||
|  |   insert(ns, ops, options, callback) { | ||
|  |     wp.insert(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback); | ||
|  |   } | ||
|  | 
 | ||
|  |   update(ns, ops, options, callback) { | ||
|  |     wp.update(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback); | ||
|  |   } | ||
|  | 
 | ||
|  |   remove(ns, ops, options, callback) { | ||
|  |     wp.remove(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback); | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | /// This lets us emulate a legacy `Server` instance so we can work with the existing wire
 | ||
|  | /// protocol methods. Eventually, the operation executor will return a `Connection` to execute
 | ||
|  | /// against.
 | ||
|  | function makeServerTrampoline(connection) { | ||
|  |   const server = { | ||
|  |     description: connection.description, | ||
|  |     clusterTime: connection[kClusterTime], | ||
|  |     s: { | ||
|  |       bson: connection.bson, | ||
|  |       pool: { write: write.bind(connection), isConnected: () => true } | ||
|  |     } | ||
|  |   }; | ||
|  | 
 | ||
|  |   if (connection[kAutoEncrypter]) { | ||
|  |     server.autoEncrypter = connection[kAutoEncrypter]; | ||
|  |   } | ||
|  | 
 | ||
|  |   return server; | ||
|  | } | ||
|  | 
 | ||
|  | function messageHandler(conn) { | ||
|  |   return function messageHandler(message) { | ||
|  |     // always emit the message, in case we are streaming
 | ||
|  |     conn.emit('message', message); | ||
|  |     if (!conn[kQueue].has(message.responseTo)) { | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     const operationDescription = conn[kQueue].get(message.responseTo); | ||
|  |     const callback = operationDescription.cb; | ||
|  | 
 | ||
|  |     // SERVER-45775: For exhaust responses we should be able to use the same requestId to
 | ||
|  |     // track response, however the server currently synthetically produces remote requests
 | ||
|  |     // making the `responseTo` change on each response
 | ||
|  |     conn[kQueue].delete(message.responseTo); | ||
|  |     if (message.moreToCome) { | ||
|  |       // requeue the callback for next synthetic request
 | ||
|  |       conn[kQueue].set(message.requestId, operationDescription); | ||
|  |     } else if (operationDescription.socketTimeoutOverride) { | ||
|  |       conn[kStream].setTimeout(conn.socketTimeout); | ||
|  |     } | ||
|  | 
 | ||
|  |     try { | ||
|  |       // Pass in the entire description because it has BSON parsing options
 | ||
|  |       message.parse(operationDescription); | ||
|  |     } catch (err) { | ||
|  |       callback(new MongoError(err)); | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     if (message.documents[0]) { | ||
|  |       const document = message.documents[0]; | ||
|  |       const session = operationDescription.session; | ||
|  |       if (session) { | ||
|  |         updateSessionFromResponse(session, document); | ||
|  |       } | ||
|  | 
 | ||
|  |       if (document.$clusterTime) { | ||
|  |         conn[kClusterTime] = document.$clusterTime; | ||
|  |         conn.emit('clusterTimeReceived', document.$clusterTime); | ||
|  |       } | ||
|  | 
 | ||
|  |       if (operationDescription.command) { | ||
|  |         if (document.writeConcernError) { | ||
|  |           callback(new MongoWriteConcernError(document.writeConcernError, document)); | ||
|  |           return; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (document.ok === 0 || document.$err || document.errmsg || document.code) { | ||
|  |           callback(new MongoError(document)); | ||
|  |           return; | ||
|  |         } | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     // NODE-2382: reenable in our glorious non-leaky abstraction future
 | ||
|  |     // callback(null, operationDescription.fullResult ? message : message.documents[0]);
 | ||
|  | 
 | ||
|  |     callback( | ||
|  |       undefined, | ||
|  |       new CommandResult( | ||
|  |         operationDescription.fullResult ? message : message.documents[0], | ||
|  |         conn, | ||
|  |         message | ||
|  |       ) | ||
|  |     ); | ||
|  |   }; | ||
|  | } | ||
|  | 
 | ||
|  | function streamIdentifier(stream) { | ||
|  |   if (typeof stream.address === 'function') { | ||
|  |     return `${stream.remoteAddress}:${stream.remotePort}`; | ||
|  |   } | ||
|  | 
 | ||
|  |   return uuidV4().toString('hex'); | ||
|  | } | ||
|  | 
 | ||
|  | // Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
 | ||
|  | function write(command, options, callback) { | ||
|  |   if (typeof options === 'function') { | ||
|  |     callback = options; | ||
|  |   } | ||
|  | 
 | ||
|  |   options = options || {}; | ||
|  |   const operationDescription = { | ||
|  |     requestId: command.requestId, | ||
|  |     cb: callback, | ||
|  |     session: options.session, | ||
|  |     fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, | ||
|  |     noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false, | ||
|  |     documentsReturnedIn: options.documentsReturnedIn, | ||
|  |     command: !!options.command, | ||
|  | 
 | ||
|  |     // for BSON parsing
 | ||
|  |     promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
|  |     promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
|  |     promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, | ||
|  |     bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false, | ||
|  |     raw: typeof options.raw === 'boolean' ? options.raw : false | ||
|  |   }; | ||
|  | 
 | ||
|  |   if (this[kDescription] && this[kDescription].compressor) { | ||
|  |     operationDescription.agreedCompressor = this[kDescription].compressor; | ||
|  | 
 | ||
|  |     if (this[kDescription].zlibCompressionLevel) { | ||
|  |       operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel; | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   if (typeof options.socketTimeout === 'number') { | ||
|  |     operationDescription.socketTimeoutOverride = true; | ||
|  |     this[kStream].setTimeout(options.socketTimeout); | ||
|  |   } | ||
|  | 
 | ||
|  |   // if command monitoring is enabled we need to modify the callback here
 | ||
|  |   if (this.monitorCommands) { | ||
|  |     this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); | ||
|  | 
 | ||
|  |     operationDescription.started = now(); | ||
|  |     operationDescription.cb = (err, reply) => { | ||
|  |       if (err) { | ||
|  |         this.emit( | ||
|  |           'commandFailed', | ||
|  |           new apm.CommandFailedEvent(this, command, err, operationDescription.started) | ||
|  |         ); | ||
|  |       } else { | ||
|  |         if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) { | ||
|  |           this.emit( | ||
|  |             'commandFailed', | ||
|  |             new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started) | ||
|  |           ); | ||
|  |         } else { | ||
|  |           this.emit( | ||
|  |             'commandSucceeded', | ||
|  |             new apm.CommandSucceededEvent(this, command, reply, operationDescription.started) | ||
|  |           ); | ||
|  |         } | ||
|  |       } | ||
|  | 
 | ||
|  |       if (typeof callback === 'function') { | ||
|  |         callback(err, reply); | ||
|  |       } | ||
|  |     }; | ||
|  |   } | ||
|  | 
 | ||
|  |   if (!operationDescription.noResponse) { | ||
|  |     this[kQueue].set(operationDescription.requestId, operationDescription); | ||
|  |   } | ||
|  | 
 | ||
|  |   try { | ||
|  |     this[kMessageStream].writeCommand(command, operationDescription); | ||
|  |   } catch (e) { | ||
|  |     if (!operationDescription.noResponse) { | ||
|  |       this[kQueue].delete(operationDescription.requestId); | ||
|  |       operationDescription.cb(e); | ||
|  |       return; | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   if (operationDescription.noResponse) { | ||
|  |     operationDescription.cb(); | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | module.exports = { | ||
|  |   Connection | ||
|  | }; |