You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					284 lines
				
				5.4 KiB
			
		
		
			
		
	
	
					284 lines
				
				5.4 KiB
			| 
											3 years ago
										 | 'use strict' | ||
|  | 
 | ||
|  | /* eslint-disable no-var */ | ||
|  | 
 | ||
|  | var reusify = require('reusify') | ||
|  | 
 | ||
|  | function fastqueue (context, worker, concurrency) { | ||
|  |   if (typeof context === 'function') { | ||
|  |     concurrency = worker | ||
|  |     worker = context | ||
|  |     context = null | ||
|  |   } | ||
|  | 
 | ||
|  |   if (concurrency < 1) { | ||
|  |     throw new Error('fastqueue concurrency must be greater than 1') | ||
|  |   } | ||
|  | 
 | ||
|  |   var cache = reusify(Task) | ||
|  |   var queueHead = null | ||
|  |   var queueTail = null | ||
|  |   var _running = 0 | ||
|  |   var errorHandler = null | ||
|  | 
 | ||
|  |   var self = { | ||
|  |     push: push, | ||
|  |     drain: noop, | ||
|  |     saturated: noop, | ||
|  |     pause: pause, | ||
|  |     paused: false, | ||
|  |     concurrency: concurrency, | ||
|  |     running: running, | ||
|  |     resume: resume, | ||
|  |     idle: idle, | ||
|  |     length: length, | ||
|  |     getQueue: getQueue, | ||
|  |     unshift: unshift, | ||
|  |     empty: noop, | ||
|  |     kill: kill, | ||
|  |     killAndDrain: killAndDrain, | ||
|  |     error: error | ||
|  |   } | ||
|  | 
 | ||
|  |   return self | ||
|  | 
 | ||
|  |   function running () { | ||
|  |     return _running | ||
|  |   } | ||
|  | 
 | ||
|  |   function pause () { | ||
|  |     self.paused = true | ||
|  |   } | ||
|  | 
 | ||
|  |   function length () { | ||
|  |     var current = queueHead | ||
|  |     var counter = 0 | ||
|  | 
 | ||
|  |     while (current) { | ||
|  |       current = current.next | ||
|  |       counter++ | ||
|  |     } | ||
|  | 
 | ||
|  |     return counter | ||
|  |   } | ||
|  | 
 | ||
|  |   function getQueue () { | ||
|  |     var current = queueHead | ||
|  |     var tasks = [] | ||
|  | 
 | ||
|  |     while (current) { | ||
|  |       tasks.push(current.value) | ||
|  |       current = current.next | ||
|  |     } | ||
|  | 
 | ||
|  |     return tasks | ||
|  |   } | ||
|  | 
 | ||
|  |   function resume () { | ||
|  |     if (!self.paused) return | ||
|  |     self.paused = false | ||
|  |     for (var i = 0; i < self.concurrency; i++) { | ||
|  |       _running++ | ||
|  |       release() | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function idle () { | ||
|  |     return _running === 0 && self.length() === 0 | ||
|  |   } | ||
|  | 
 | ||
|  |   function push (value, done) { | ||
|  |     var current = cache.get() | ||
|  | 
 | ||
|  |     current.context = context | ||
|  |     current.release = release | ||
|  |     current.value = value | ||
|  |     current.callback = done || noop | ||
|  |     current.errorHandler = errorHandler | ||
|  | 
 | ||
|  |     if (_running === self.concurrency || self.paused) { | ||
|  |       if (queueTail) { | ||
|  |         queueTail.next = current | ||
|  |         queueTail = current | ||
|  |       } else { | ||
|  |         queueHead = current | ||
|  |         queueTail = current | ||
|  |         self.saturated() | ||
|  |       } | ||
|  |     } else { | ||
|  |       _running++ | ||
|  |       worker.call(context, current.value, current.worked) | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function unshift (value, done) { | ||
|  |     var current = cache.get() | ||
|  | 
 | ||
|  |     current.context = context | ||
|  |     current.release = release | ||
|  |     current.value = value | ||
|  |     current.callback = done || noop | ||
|  | 
 | ||
|  |     if (_running === self.concurrency || self.paused) { | ||
|  |       if (queueHead) { | ||
|  |         current.next = queueHead | ||
|  |         queueHead = current | ||
|  |       } else { | ||
|  |         queueHead = current | ||
|  |         queueTail = current | ||
|  |         self.saturated() | ||
|  |       } | ||
|  |     } else { | ||
|  |       _running++ | ||
|  |       worker.call(context, current.value, current.worked) | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function release (holder) { | ||
|  |     if (holder) { | ||
|  |       cache.release(holder) | ||
|  |     } | ||
|  |     var next = queueHead | ||
|  |     if (next) { | ||
|  |       if (!self.paused) { | ||
|  |         if (queueTail === queueHead) { | ||
|  |           queueTail = null | ||
|  |         } | ||
|  |         queueHead = next.next | ||
|  |         next.next = null | ||
|  |         worker.call(context, next.value, next.worked) | ||
|  |         if (queueTail === null) { | ||
|  |           self.empty() | ||
|  |         } | ||
|  |       } else { | ||
|  |         _running-- | ||
|  |       } | ||
|  |     } else if (--_running === 0) { | ||
|  |       self.drain() | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function kill () { | ||
|  |     queueHead = null | ||
|  |     queueTail = null | ||
|  |     self.drain = noop | ||
|  |   } | ||
|  | 
 | ||
|  |   function killAndDrain () { | ||
|  |     queueHead = null | ||
|  |     queueTail = null | ||
|  |     self.drain() | ||
|  |     self.drain = noop | ||
|  |   } | ||
|  | 
 | ||
|  |   function error (handler) { | ||
|  |     errorHandler = handler | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function noop () {} | ||
|  | 
 | ||
|  | function Task () { | ||
|  |   this.value = null | ||
|  |   this.callback = noop | ||
|  |   this.next = null | ||
|  |   this.release = noop | ||
|  |   this.context = null | ||
|  |   this.errorHandler = null | ||
|  | 
 | ||
|  |   var self = this | ||
|  | 
 | ||
|  |   this.worked = function worked (err, result) { | ||
|  |     var callback = self.callback | ||
|  |     var errorHandler = self.errorHandler | ||
|  |     var val = self.value | ||
|  |     self.value = null | ||
|  |     self.callback = noop | ||
|  |     if (self.errorHandler) { | ||
|  |       errorHandler(err, val) | ||
|  |     } | ||
|  |     callback.call(self.context, err, result) | ||
|  |     self.release(self) | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | function queueAsPromised (context, worker, concurrency) { | ||
|  |   if (typeof context === 'function') { | ||
|  |     concurrency = worker | ||
|  |     worker = context | ||
|  |     context = null | ||
|  |   } | ||
|  | 
 | ||
|  |   function asyncWrapper (arg, cb) { | ||
|  |     worker.call(this, arg) | ||
|  |       .then(function (res) { | ||
|  |         cb(null, res) | ||
|  |       }, cb) | ||
|  |   } | ||
|  | 
 | ||
|  |   var queue = fastqueue(context, asyncWrapper, concurrency) | ||
|  | 
 | ||
|  |   var pushCb = queue.push | ||
|  |   var unshiftCb = queue.unshift | ||
|  | 
 | ||
|  |   queue.push = push | ||
|  |   queue.unshift = unshift | ||
|  |   queue.drained = drained | ||
|  | 
 | ||
|  |   return queue | ||
|  | 
 | ||
|  |   function push (value) { | ||
|  |     var p = new Promise(function (resolve, reject) { | ||
|  |       pushCb(value, function (err, result) { | ||
|  |         if (err) { | ||
|  |           reject(err) | ||
|  |           return | ||
|  |         } | ||
|  |         resolve(result) | ||
|  |       }) | ||
|  |     }) | ||
|  | 
 | ||
|  |     // Let's fork the promise chain to
 | ||
|  |     // make the error bubble up to the user but
 | ||
|  |     // not lead to a unhandledRejection
 | ||
|  |     p.catch(noop) | ||
|  | 
 | ||
|  |     return p | ||
|  |   } | ||
|  | 
 | ||
|  |   function unshift (value) { | ||
|  |     var p = new Promise(function (resolve, reject) { | ||
|  |       unshiftCb(value, function (err, result) { | ||
|  |         if (err) { | ||
|  |           reject(err) | ||
|  |           return | ||
|  |         } | ||
|  |         resolve(result) | ||
|  |       }) | ||
|  |     }) | ||
|  | 
 | ||
|  |     // Let's fork the promise chain to
 | ||
|  |     // make the error bubble up to the user but
 | ||
|  |     // not lead to a unhandledRejection
 | ||
|  |     p.catch(noop) | ||
|  | 
 | ||
|  |     return p | ||
|  |   } | ||
|  | 
 | ||
|  |   function drained () { | ||
|  |     var previousDrain = queue.drain | ||
|  | 
 | ||
|  |     var p = new Promise(function (resolve) { | ||
|  |       queue.drain = function () { | ||
|  |         previousDrain() | ||
|  |         resolve() | ||
|  |       } | ||
|  |     }) | ||
|  | 
 | ||
|  |     return p | ||
|  |   } | ||
|  | } | ||
|  | 
 | ||
|  | module.exports = fastqueue | ||
|  | module.exports.promise = queueAsPromised |