You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							158 lines
						
					
					
						
							4.2 KiB
						
					
					
				
			
		
		
	
	
							158 lines
						
					
					
						
							4.2 KiB
						
					
					
				'use strict';
 | 
						|
 | 
						|
/*!
 | 
						|
 * Module dependencies.
 | 
						|
 */
 | 
						|
 | 
						|
const immediate = require('../immediate');
 | 
						|
const promiseOrCallback = require('../promiseOrCallback');
 | 
						|
 | 
						|
/**
 | 
						|
 * Execute `fn` for every document in the cursor. If `fn` returns a promise,
 | 
						|
 * will wait for the promise to resolve before iterating on to the next one.
 | 
						|
 * Returns a promise that resolves when done.
 | 
						|
 *
 | 
						|
 * @param {Function} next the thunk to call to get the next document
 | 
						|
 * @param {Function} fn
 | 
						|
 * @param {Object} options
 | 
						|
 * @param {Function} [callback] executed when all docs have been processed
 | 
						|
 * @return {Promise}
 | 
						|
 * @api public
 | 
						|
 * @method eachAsync
 | 
						|
 */
 | 
						|
 | 
						|
module.exports = function eachAsync(next, fn, options, callback) {
 | 
						|
  const parallel = options.parallel || 1;
 | 
						|
  const batchSize = options.batchSize;
 | 
						|
  const enqueue = asyncQueue();
 | 
						|
 | 
						|
  return promiseOrCallback(callback, cb => {
 | 
						|
    if (batchSize != null) {
 | 
						|
      if (typeof batchSize !== 'number') {
 | 
						|
        throw new TypeError('batchSize must be a number');
 | 
						|
      }
 | 
						|
      if (batchSize < 1) {
 | 
						|
        throw new TypeError('batchSize must be at least 1');
 | 
						|
      }
 | 
						|
      if (batchSize !== Math.floor(batchSize)) {
 | 
						|
        throw new TypeError('batchSize must be a positive integer');
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    iterate(cb);
 | 
						|
  });
 | 
						|
 | 
						|
  function iterate(finalCallback) {
 | 
						|
    let drained = false;
 | 
						|
    let handleResultsInProgress = 0;
 | 
						|
    let currentDocumentIndex = 0;
 | 
						|
    let documentsBatch = [];
 | 
						|
 | 
						|
    let error = null;
 | 
						|
    for (let i = 0; i < parallel; ++i) {
 | 
						|
      enqueue(fetch);
 | 
						|
    }
 | 
						|
 | 
						|
    function fetch(done) {
 | 
						|
      if (drained || error) {
 | 
						|
        return done();
 | 
						|
      }
 | 
						|
 | 
						|
      next(function(err, doc) {
 | 
						|
        if (drained || error != null) {
 | 
						|
          return done();
 | 
						|
        }
 | 
						|
        if (err != null) {
 | 
						|
          error = err;
 | 
						|
          finalCallback(err);
 | 
						|
          return done();
 | 
						|
        }
 | 
						|
        if (doc == null) {
 | 
						|
          drained = true;
 | 
						|
          if (handleResultsInProgress <= 0) {
 | 
						|
            finalCallback(null);
 | 
						|
          } else if (batchSize != null && documentsBatch.length) {
 | 
						|
            handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
 | 
						|
          }
 | 
						|
          return done();
 | 
						|
        }
 | 
						|
 | 
						|
        ++handleResultsInProgress;
 | 
						|
 | 
						|
        // Kick off the subsequent `next()` before handling the result, but
 | 
						|
        // make sure we know that we still have a result to handle re: #8422
 | 
						|
        immediate(() => done());
 | 
						|
 | 
						|
        if (batchSize != null) {
 | 
						|
          documentsBatch.push(doc);
 | 
						|
        }
 | 
						|
 | 
						|
        // If the current documents size is less than the provided patch size don't process the documents yet
 | 
						|
        if (batchSize != null && documentsBatch.length !== batchSize) {
 | 
						|
          setTimeout(() => enqueue(fetch), 0);
 | 
						|
          return;
 | 
						|
        }
 | 
						|
 | 
						|
        const docsToProcess = batchSize != null ? documentsBatch : doc;
 | 
						|
 | 
						|
        function handleNextResultCallBack(err) {
 | 
						|
          if (batchSize != null) {
 | 
						|
            handleResultsInProgress -= documentsBatch.length;
 | 
						|
            documentsBatch = [];
 | 
						|
          } else {
 | 
						|
            --handleResultsInProgress;
 | 
						|
          }
 | 
						|
          if (err != null) {
 | 
						|
            error = err;
 | 
						|
            return finalCallback(err);
 | 
						|
          }
 | 
						|
          if (drained && handleResultsInProgress <= 0) {
 | 
						|
            return finalCallback(null);
 | 
						|
          }
 | 
						|
 | 
						|
          setTimeout(() => enqueue(fetch), 0);
 | 
						|
        }
 | 
						|
 | 
						|
        handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
 | 
						|
      });
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  function handleNextResult(doc, i, callback) {
 | 
						|
    const promise = fn(doc, i);
 | 
						|
    if (promise && typeof promise.then === 'function') {
 | 
						|
      promise.then(
 | 
						|
        function() { callback(null); },
 | 
						|
        function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
 | 
						|
    } else {
 | 
						|
      callback(null);
 | 
						|
    }
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
// `next()` can only execute one at a time, so make sure we always execute
 | 
						|
// `next()` in series, while still allowing multiple `fn()` instances to run
 | 
						|
// in parallel.
 | 
						|
function asyncQueue() {
 | 
						|
  const _queue = [];
 | 
						|
  let inProgress = null;
 | 
						|
  let id = 0;
 | 
						|
 | 
						|
  return function enqueue(fn) {
 | 
						|
    if (_queue.length === 0 && inProgress == null) {
 | 
						|
      inProgress = id++;
 | 
						|
      return fn(_step);
 | 
						|
    }
 | 
						|
    _queue.push(fn);
 | 
						|
  };
 | 
						|
 | 
						|
  function _step() {
 | 
						|
    inProgress = null;
 | 
						|
    if (_queue.length > 0) {
 | 
						|
      inProgress = id++;
 | 
						|
      const fn = _queue.shift();
 | 
						|
      fn(_step);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 |