You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					158 lines
				
				4.2 KiB
			
		
		
			
		
	
	
					158 lines
				
				4.2 KiB
			| 
											3 years ago
										 | 'use strict'; | ||
|  | 
 | ||
|  | /*! | ||
|  |  * Module dependencies. | ||
|  |  */ | ||
|  | 
 | ||
|  | const immediate = require('../immediate'); | ||
|  | const promiseOrCallback = require('../promiseOrCallback'); | ||
|  | 
 | ||
|  | /** | ||
|  |  * Execute `fn` for every document in the cursor. If `fn` returns a promise, | ||
|  |  * will wait for the promise to resolve before iterating on to the next one. | ||
|  |  * Returns a promise that resolves when done. | ||
|  |  * | ||
|  |  * @param {Function} next the thunk to call to get the next document | ||
|  |  * @param {Function} fn | ||
|  |  * @param {Object} options | ||
|  |  * @param {Function} [callback] executed when all docs have been processed | ||
|  |  * @return {Promise} | ||
|  |  * @api public | ||
|  |  * @method eachAsync | ||
|  |  */ | ||
|  | 
 | ||
|  | module.exports = function eachAsync(next, fn, options, callback) { | ||
|  |   const parallel = options.parallel || 1; | ||
|  |   const batchSize = options.batchSize; | ||
|  |   const enqueue = asyncQueue(); | ||
|  | 
 | ||
|  |   return promiseOrCallback(callback, cb => { | ||
|  |     if (batchSize != null) { | ||
|  |       if (typeof batchSize !== 'number') { | ||
|  |         throw new TypeError('batchSize must be a number'); | ||
|  |       } | ||
|  |       if (batchSize < 1) { | ||
|  |         throw new TypeError('batchSize must be at least 1'); | ||
|  |       } | ||
|  |       if (batchSize !== Math.floor(batchSize)) { | ||
|  |         throw new TypeError('batchSize must be a positive integer'); | ||
|  |       } | ||
|  |     } | ||
|  | 
 | ||
|  |     iterate(cb); | ||
|  |   }); | ||
|  | 
 | ||
|  |   function iterate(finalCallback) { | ||
|  |     let drained = false; | ||
|  |     let handleResultsInProgress = 0; | ||
|  |     let currentDocumentIndex = 0; | ||
|  |     let documentsBatch = []; | ||
|  | 
 | ||
|  |     let error = null; | ||
|  |     for (let i = 0; i < parallel; ++i) { | ||
|  |       enqueue(fetch); | ||
|  |     } | ||
|  | 
 | ||
|  |     function fetch(done) { | ||
|  |       if (drained || error) { | ||
|  |         return done(); | ||
|  |       } | ||
|  | 
 | ||
|  |       next(function(err, doc) { | ||
|  |         if (drained || error != null) { | ||
|  |           return done(); | ||
|  |         } | ||
|  |         if (err != null) { | ||
|  |           error = err; | ||
|  |           finalCallback(err); | ||
|  |           return done(); | ||
|  |         } | ||
|  |         if (doc == null) { | ||
|  |           drained = true; | ||
|  |           if (handleResultsInProgress <= 0) { | ||
|  |             finalCallback(null); | ||
|  |           } else if (batchSize != null && documentsBatch.length) { | ||
|  |             handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack); | ||
|  |           } | ||
|  |           return done(); | ||
|  |         } | ||
|  | 
 | ||
|  |         ++handleResultsInProgress; | ||
|  | 
 | ||
|  |         // Kick off the subsequent `next()` before handling the result, but
 | ||
|  |         // make sure we know that we still have a result to handle re: #8422
 | ||
|  |         immediate(() => done()); | ||
|  | 
 | ||
|  |         if (batchSize != null) { | ||
|  |           documentsBatch.push(doc); | ||
|  |         } | ||
|  | 
 | ||
|  |         // If the current documents size is less than the provided patch size don't process the documents yet
 | ||
|  |         if (batchSize != null && documentsBatch.length !== batchSize) { | ||
|  |           setTimeout(() => enqueue(fetch), 0); | ||
|  |           return; | ||
|  |         } | ||
|  | 
 | ||
|  |         const docsToProcess = batchSize != null ? documentsBatch : doc; | ||
|  | 
 | ||
|  |         function handleNextResultCallBack(err) { | ||
|  |           if (batchSize != null) { | ||
|  |             handleResultsInProgress -= documentsBatch.length; | ||
|  |             documentsBatch = []; | ||
|  |           } else { | ||
|  |             --handleResultsInProgress; | ||
|  |           } | ||
|  |           if (err != null) { | ||
|  |             error = err; | ||
|  |             return finalCallback(err); | ||
|  |           } | ||
|  |           if (drained && handleResultsInProgress <= 0) { | ||
|  |             return finalCallback(null); | ||
|  |           } | ||
|  | 
 | ||
|  |           setTimeout(() => enqueue(fetch), 0); | ||
|  |         } | ||
|  | 
 | ||
|  |         handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack); | ||
|  |       }); | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function handleNextResult(doc, i, callback) { | ||
|  |     const promise = fn(doc, i); | ||
|  |     if (promise && typeof promise.then === 'function') { | ||
|  |       promise.then( | ||
|  |         function() { callback(null); }, | ||
|  |         function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); }); | ||
|  |     } else { | ||
|  |       callback(null); | ||
|  |     } | ||
|  |   } | ||
|  | }; | ||
|  | 
 | ||
|  | // `next()` can only execute one at a time, so make sure we always execute
 | ||
|  | // `next()` in series, while still allowing multiple `fn()` instances to run
 | ||
|  | // in parallel.
 | ||
|  | function asyncQueue() { | ||
|  |   const _queue = []; | ||
|  |   let inProgress = null; | ||
|  |   let id = 0; | ||
|  | 
 | ||
|  |   return function enqueue(fn) { | ||
|  |     if (_queue.length === 0 && inProgress == null) { | ||
|  |       inProgress = id++; | ||
|  |       return fn(_step); | ||
|  |     } | ||
|  |     _queue.push(fn); | ||
|  |   }; | ||
|  | 
 | ||
|  |   function _step() { | ||
|  |     inProgress = null; | ||
|  |     if (_queue.length > 0) { | ||
|  |       inProgress = id++; | ||
|  |       const fn = _queue.shift(); | ||
|  |       fn(_step); | ||
|  |     } | ||
|  |   } | ||
|  | } |