You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					210 lines
				
				6.5 KiB
			
		
		
			
		
	
	
					210 lines
				
				6.5 KiB
			| 
											3 years ago
										 | 'use strict'; | ||
|  | 
 | ||
|  | const applyWriteConcern = require('../utils').applyWriteConcern; | ||
|  | const Code = require('../core').BSON.Code; | ||
|  | const decorateWithCollation = require('../utils').decorateWithCollation; | ||
|  | const decorateWithReadConcern = require('../utils').decorateWithReadConcern; | ||
|  | const executeCommand = require('./db_ops').executeCommand; | ||
|  | const handleCallback = require('../utils').handleCallback; | ||
|  | const isObject = require('../utils').isObject; | ||
|  | const loadDb = require('../dynamic_loaders').loadDb; | ||
|  | const OperationBase = require('./operation').OperationBase; | ||
|  | const ReadPreference = require('../core').ReadPreference; | ||
|  | const toError = require('../utils').toError; | ||
|  | const Aspect = require('./operation').Aspect; | ||
|  | const defineAspects = require('./operation').defineAspects; | ||
|  | const decorateWithExplain = require('../utils').decorateWithExplain; | ||
|  | const maxWireVersion = require('../core/utils').maxWireVersion; | ||
|  | const MongoError = require('../error').MongoError; | ||
|  | 
 | ||
|  | const exclusionList = [ | ||
|  |   'explain', | ||
|  |   'readPreference', | ||
|  |   'session', | ||
|  |   'bypassDocumentValidation', | ||
|  |   'w', | ||
|  |   'wtimeout', | ||
|  |   'j', | ||
|  |   'writeConcern' | ||
|  | ]; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection. | ||
|  |  * | ||
|  |  * @class | ||
|  |  * @property {Collection} a Collection instance. | ||
|  |  * @property {(function|string)} map The mapping function. | ||
|  |  * @property {(function|string)} reduce The reduce function. | ||
|  |  * @property {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options. | ||
|  |  */ | ||
|  | class MapReduceOperation extends OperationBase { | ||
|  |   /** | ||
|  |    * Constructs a MapReduce operation. | ||
|  |    * | ||
|  |    * @param {Collection} a Collection instance. | ||
|  |    * @param {(function|string)} map The mapping function. | ||
|  |    * @param {(function|string)} reduce The reduce function. | ||
|  |    * @param {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options. | ||
|  |    */ | ||
|  |   constructor(collection, map, reduce, options) { | ||
|  |     super(options); | ||
|  | 
 | ||
|  |     this.collection = collection; | ||
|  |     this.map = map; | ||
|  |     this.reduce = reduce; | ||
|  |   } | ||
|  | 
 | ||
|  |   /** | ||
|  |    * Execute the operation. | ||
|  |    * | ||
|  |    * @param {Collection~resultCallback} [callback] The command result callback | ||
|  |    */ | ||
|  |   execute(callback) { | ||
|  |     const coll = this.collection; | ||
|  |     const map = this.map; | ||
|  |     const reduce = this.reduce; | ||
|  |     let options = this.options; | ||
|  | 
 | ||
|  |     let mapCommandHash = { | ||
|  |       mapReduce: coll.collectionName, | ||
|  |       map: map, | ||
|  |       reduce: reduce | ||
|  |     }; | ||
|  | 
 | ||
|  |     // Add any other options passed in
 | ||
|  |     for (let n in options) { | ||
|  |       if ('scope' === n) { | ||
|  |         mapCommandHash[n] = processScope(options[n]); | ||
|  |       } else { | ||
|  |         // Only include if not in exclusion list
 | ||
|  |         if (exclusionList.indexOf(n) === -1) { | ||
|  |           mapCommandHash[n] = options[n]; | ||
|  |         } | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     options = Object.assign({}, options); | ||
|  | 
 | ||
|  |     // Ensure we have the right read preference inheritance
 | ||
|  |     options.readPreference = ReadPreference.resolve(coll, options); | ||
|  | 
 | ||
|  |     // If we have a read preference and inline is not set as output fail hard
 | ||
|  |     if ( | ||
|  |       options.readPreference !== false && | ||
|  |       options.readPreference !== 'primary' && | ||
|  |       options['out'] && | ||
|  |       options['out'].inline !== 1 && | ||
|  |       options['out'] !== 'inline' | ||
|  |     ) { | ||
|  |       // Force readPreference to primary
 | ||
|  |       options.readPreference = 'primary'; | ||
|  |       // Decorate command with writeConcern if supported
 | ||
|  |       applyWriteConcern(mapCommandHash, { db: coll.s.db, collection: coll }, options); | ||
|  |     } else { | ||
|  |       decorateWithReadConcern(mapCommandHash, coll, options); | ||
|  |     } | ||
|  | 
 | ||
|  |     // Is bypassDocumentValidation specified
 | ||
|  |     if (options.bypassDocumentValidation === true) { | ||
|  |       mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation; | ||
|  |     } | ||
|  | 
 | ||
|  |     // Have we specified collation
 | ||
|  |     try { | ||
|  |       decorateWithCollation(mapCommandHash, coll, options); | ||
|  |     } catch (err) { | ||
|  |       return callback(err, null); | ||
|  |     } | ||
|  | 
 | ||
|  |     if (this.explain) { | ||
|  |       if (maxWireVersion(coll.s.topology) < 9) { | ||
|  |         callback(new MongoError(`server does not support explain on mapReduce`)); | ||
|  |         return; | ||
|  |       } | ||
|  |       mapCommandHash = decorateWithExplain(mapCommandHash, this.explain); | ||
|  |     } | ||
|  | 
 | ||
|  |     // Execute command
 | ||
|  |     executeCommand(coll.s.db, mapCommandHash, options, (err, result) => { | ||
|  |       if (err) return handleCallback(callback, err); | ||
|  |       // Check if we have an error
 | ||
|  |       if (1 !== result.ok || result.err || result.errmsg) { | ||
|  |         return handleCallback(callback, toError(result)); | ||
|  |       } | ||
|  | 
 | ||
|  |       // If an explain operation was executed, don't process the server results
 | ||
|  |       if (this.explain) return callback(undefined, result); | ||
|  | 
 | ||
|  |       // Create statistics value
 | ||
|  |       const stats = {}; | ||
|  |       if (result.timeMillis) stats['processtime'] = result.timeMillis; | ||
|  |       if (result.counts) stats['counts'] = result.counts; | ||
|  |       if (result.timing) stats['timing'] = result.timing; | ||
|  | 
 | ||
|  |       // invoked with inline?
 | ||
|  |       if (result.results) { | ||
|  |         // If we wish for no verbosity
 | ||
|  |         if (options['verbose'] == null || !options['verbose']) { | ||
|  |           return handleCallback(callback, null, result.results); | ||
|  |         } | ||
|  | 
 | ||
|  |         return handleCallback(callback, null, { results: result.results, stats: stats }); | ||
|  |       } | ||
|  | 
 | ||
|  |       // The returned collection
 | ||
|  |       let collection = null; | ||
|  | 
 | ||
|  |       // If we have an object it's a different db
 | ||
|  |       if (result.result != null && typeof result.result === 'object') { | ||
|  |         const doc = result.result; | ||
|  |         // Return a collection from another db
 | ||
|  |         let Db = loadDb(); | ||
|  |         collection = new Db(doc.db, coll.s.db.s.topology, coll.s.db.s.options).collection( | ||
|  |           doc.collection | ||
|  |         ); | ||
|  |       } else { | ||
|  |         // Create a collection object that wraps the result collection
 | ||
|  |         collection = coll.s.db.collection(result.result); | ||
|  |       } | ||
|  | 
 | ||
|  |       // If we wish for no verbosity
 | ||
|  |       if (options['verbose'] == null || !options['verbose']) { | ||
|  |         return handleCallback(callback, err, collection); | ||
|  |       } | ||
|  | 
 | ||
|  |       // Return stats as third set of values
 | ||
|  |       handleCallback(callback, err, { collection: collection, stats: stats }); | ||
|  |     }); | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Functions that are passed as scope args must | ||
|  |  * be converted to Code instances. | ||
|  |  * @ignore | ||
|  |  */ | ||
|  | function processScope(scope) { | ||
|  |   if (!isObject(scope) || scope._bsontype === 'ObjectID') { | ||
|  |     return scope; | ||
|  |   } | ||
|  | 
 | ||
|  |   const keys = Object.keys(scope); | ||
|  |   let key; | ||
|  |   const new_scope = {}; | ||
|  | 
 | ||
|  |   for (let i = keys.length - 1; i >= 0; i--) { | ||
|  |     key = keys[i]; | ||
|  |     if ('function' === typeof scope[key]) { | ||
|  |       new_scope[key] = new Code(String(scope[key])); | ||
|  |     } else { | ||
|  |       new_scope[key] = processScope(scope[key]); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   return new_scope; | ||
|  | } | ||
|  | 
 | ||
|  | defineAspects(MapReduceOperation, [Aspect.EXPLAINABLE]); | ||
|  | 
 | ||
|  | module.exports = MapReduceOperation; |