You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							284 lines
						
					
					
						
							5.4 KiB
						
					
					
				
			
		
		
	
	
							284 lines
						
					
					
						
							5.4 KiB
						
					
					
				| 'use strict'
 | |
| 
 | |
| /* eslint-disable no-var */
 | |
| 
 | |
| var reusify = require('reusify')
 | |
| 
 | |
| function fastqueue (context, worker, concurrency) {
 | |
|   if (typeof context === 'function') {
 | |
|     concurrency = worker
 | |
|     worker = context
 | |
|     context = null
 | |
|   }
 | |
| 
 | |
|   if (concurrency < 1) {
 | |
|     throw new Error('fastqueue concurrency must be greater than 1')
 | |
|   }
 | |
| 
 | |
|   var cache = reusify(Task)
 | |
|   var queueHead = null
 | |
|   var queueTail = null
 | |
|   var _running = 0
 | |
|   var errorHandler = null
 | |
| 
 | |
|   var self = {
 | |
|     push: push,
 | |
|     drain: noop,
 | |
|     saturated: noop,
 | |
|     pause: pause,
 | |
|     paused: false,
 | |
|     concurrency: concurrency,
 | |
|     running: running,
 | |
|     resume: resume,
 | |
|     idle: idle,
 | |
|     length: length,
 | |
|     getQueue: getQueue,
 | |
|     unshift: unshift,
 | |
|     empty: noop,
 | |
|     kill: kill,
 | |
|     killAndDrain: killAndDrain,
 | |
|     error: error
 | |
|   }
 | |
| 
 | |
|   return self
 | |
| 
 | |
|   function running () {
 | |
|     return _running
 | |
|   }
 | |
| 
 | |
|   function pause () {
 | |
|     self.paused = true
 | |
|   }
 | |
| 
 | |
|   function length () {
 | |
|     var current = queueHead
 | |
|     var counter = 0
 | |
| 
 | |
|     while (current) {
 | |
|       current = current.next
 | |
|       counter++
 | |
|     }
 | |
| 
 | |
|     return counter
 | |
|   }
 | |
| 
 | |
|   function getQueue () {
 | |
|     var current = queueHead
 | |
|     var tasks = []
 | |
| 
 | |
|     while (current) {
 | |
|       tasks.push(current.value)
 | |
|       current = current.next
 | |
|     }
 | |
| 
 | |
|     return tasks
 | |
|   }
 | |
| 
 | |
|   function resume () {
 | |
|     if (!self.paused) return
 | |
|     self.paused = false
 | |
|     for (var i = 0; i < self.concurrency; i++) {
 | |
|       _running++
 | |
|       release()
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   function idle () {
 | |
|     return _running === 0 && self.length() === 0
 | |
|   }
 | |
| 
 | |
|   function push (value, done) {
 | |
|     var current = cache.get()
 | |
| 
 | |
|     current.context = context
 | |
|     current.release = release
 | |
|     current.value = value
 | |
|     current.callback = done || noop
 | |
|     current.errorHandler = errorHandler
 | |
| 
 | |
|     if (_running === self.concurrency || self.paused) {
 | |
|       if (queueTail) {
 | |
|         queueTail.next = current
 | |
|         queueTail = current
 | |
|       } else {
 | |
|         queueHead = current
 | |
|         queueTail = current
 | |
|         self.saturated()
 | |
|       }
 | |
|     } else {
 | |
|       _running++
 | |
|       worker.call(context, current.value, current.worked)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   function unshift (value, done) {
 | |
|     var current = cache.get()
 | |
| 
 | |
|     current.context = context
 | |
|     current.release = release
 | |
|     current.value = value
 | |
|     current.callback = done || noop
 | |
| 
 | |
|     if (_running === self.concurrency || self.paused) {
 | |
|       if (queueHead) {
 | |
|         current.next = queueHead
 | |
|         queueHead = current
 | |
|       } else {
 | |
|         queueHead = current
 | |
|         queueTail = current
 | |
|         self.saturated()
 | |
|       }
 | |
|     } else {
 | |
|       _running++
 | |
|       worker.call(context, current.value, current.worked)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   function release (holder) {
 | |
|     if (holder) {
 | |
|       cache.release(holder)
 | |
|     }
 | |
|     var next = queueHead
 | |
|     if (next) {
 | |
|       if (!self.paused) {
 | |
|         if (queueTail === queueHead) {
 | |
|           queueTail = null
 | |
|         }
 | |
|         queueHead = next.next
 | |
|         next.next = null
 | |
|         worker.call(context, next.value, next.worked)
 | |
|         if (queueTail === null) {
 | |
|           self.empty()
 | |
|         }
 | |
|       } else {
 | |
|         _running--
 | |
|       }
 | |
|     } else if (--_running === 0) {
 | |
|       self.drain()
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   function kill () {
 | |
|     queueHead = null
 | |
|     queueTail = null
 | |
|     self.drain = noop
 | |
|   }
 | |
| 
 | |
|   function killAndDrain () {
 | |
|     queueHead = null
 | |
|     queueTail = null
 | |
|     self.drain()
 | |
|     self.drain = noop
 | |
|   }
 | |
| 
 | |
|   function error (handler) {
 | |
|     errorHandler = handler
 | |
|   }
 | |
| }
 | |
| 
 | |
| function noop () {}
 | |
| 
 | |
| function Task () {
 | |
|   this.value = null
 | |
|   this.callback = noop
 | |
|   this.next = null
 | |
|   this.release = noop
 | |
|   this.context = null
 | |
|   this.errorHandler = null
 | |
| 
 | |
|   var self = this
 | |
| 
 | |
|   this.worked = function worked (err, result) {
 | |
|     var callback = self.callback
 | |
|     var errorHandler = self.errorHandler
 | |
|     var val = self.value
 | |
|     self.value = null
 | |
|     self.callback = noop
 | |
|     if (self.errorHandler) {
 | |
|       errorHandler(err, val)
 | |
|     }
 | |
|     callback.call(self.context, err, result)
 | |
|     self.release(self)
 | |
|   }
 | |
| }
 | |
| 
 | |
| function queueAsPromised (context, worker, concurrency) {
 | |
|   if (typeof context === 'function') {
 | |
|     concurrency = worker
 | |
|     worker = context
 | |
|     context = null
 | |
|   }
 | |
| 
 | |
|   function asyncWrapper (arg, cb) {
 | |
|     worker.call(this, arg)
 | |
|       .then(function (res) {
 | |
|         cb(null, res)
 | |
|       }, cb)
 | |
|   }
 | |
| 
 | |
|   var queue = fastqueue(context, asyncWrapper, concurrency)
 | |
| 
 | |
|   var pushCb = queue.push
 | |
|   var unshiftCb = queue.unshift
 | |
| 
 | |
|   queue.push = push
 | |
|   queue.unshift = unshift
 | |
|   queue.drained = drained
 | |
| 
 | |
|   return queue
 | |
| 
 | |
|   function push (value) {
 | |
|     var p = new Promise(function (resolve, reject) {
 | |
|       pushCb(value, function (err, result) {
 | |
|         if (err) {
 | |
|           reject(err)
 | |
|           return
 | |
|         }
 | |
|         resolve(result)
 | |
|       })
 | |
|     })
 | |
| 
 | |
|     // Let's fork the promise chain to
 | |
|     // make the error bubble up to the user but
 | |
|     // not lead to a unhandledRejection
 | |
|     p.catch(noop)
 | |
| 
 | |
|     return p
 | |
|   }
 | |
| 
 | |
|   function unshift (value) {
 | |
|     var p = new Promise(function (resolve, reject) {
 | |
|       unshiftCb(value, function (err, result) {
 | |
|         if (err) {
 | |
|           reject(err)
 | |
|           return
 | |
|         }
 | |
|         resolve(result)
 | |
|       })
 | |
|     })
 | |
| 
 | |
|     // Let's fork the promise chain to
 | |
|     // make the error bubble up to the user but
 | |
|     // not lead to a unhandledRejection
 | |
|     p.catch(noop)
 | |
| 
 | |
|     return p
 | |
|   }
 | |
| 
 | |
|   function drained () {
 | |
|     var previousDrain = queue.drain
 | |
| 
 | |
|     var p = new Promise(function (resolve) {
 | |
|       queue.drain = function () {
 | |
|         previousDrain()
 | |
|         resolve()
 | |
|       }
 | |
|     })
 | |
| 
 | |
|     return p
 | |
|   }
 | |
| }
 | |
| 
 | |
| module.exports = fastqueue
 | |
| module.exports.promise = queueAsPromised
 |