You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							284 lines
						
					
					
						
							5.4 KiB
						
					
					
				
			
		
		
	
	
							284 lines
						
					
					
						
							5.4 KiB
						
					
					
				'use strict'
 | 
						|
 | 
						|
/* eslint-disable no-var */
 | 
						|
 | 
						|
var reusify = require('reusify')
 | 
						|
 | 
						|
function fastqueue (context, worker, concurrency) {
 | 
						|
  if (typeof context === 'function') {
 | 
						|
    concurrency = worker
 | 
						|
    worker = context
 | 
						|
    context = null
 | 
						|
  }
 | 
						|
 | 
						|
  if (concurrency < 1) {
 | 
						|
    throw new Error('fastqueue concurrency must be greater than 1')
 | 
						|
  }
 | 
						|
 | 
						|
  var cache = reusify(Task)
 | 
						|
  var queueHead = null
 | 
						|
  var queueTail = null
 | 
						|
  var _running = 0
 | 
						|
  var errorHandler = null
 | 
						|
 | 
						|
  var self = {
 | 
						|
    push: push,
 | 
						|
    drain: noop,
 | 
						|
    saturated: noop,
 | 
						|
    pause: pause,
 | 
						|
    paused: false,
 | 
						|
    concurrency: concurrency,
 | 
						|
    running: running,
 | 
						|
    resume: resume,
 | 
						|
    idle: idle,
 | 
						|
    length: length,
 | 
						|
    getQueue: getQueue,
 | 
						|
    unshift: unshift,
 | 
						|
    empty: noop,
 | 
						|
    kill: kill,
 | 
						|
    killAndDrain: killAndDrain,
 | 
						|
    error: error
 | 
						|
  }
 | 
						|
 | 
						|
  return self
 | 
						|
 | 
						|
  function running () {
 | 
						|
    return _running
 | 
						|
  }
 | 
						|
 | 
						|
  function pause () {
 | 
						|
    self.paused = true
 | 
						|
  }
 | 
						|
 | 
						|
  function length () {
 | 
						|
    var current = queueHead
 | 
						|
    var counter = 0
 | 
						|
 | 
						|
    while (current) {
 | 
						|
      current = current.next
 | 
						|
      counter++
 | 
						|
    }
 | 
						|
 | 
						|
    return counter
 | 
						|
  }
 | 
						|
 | 
						|
  function getQueue () {
 | 
						|
    var current = queueHead
 | 
						|
    var tasks = []
 | 
						|
 | 
						|
    while (current) {
 | 
						|
      tasks.push(current.value)
 | 
						|
      current = current.next
 | 
						|
    }
 | 
						|
 | 
						|
    return tasks
 | 
						|
  }
 | 
						|
 | 
						|
  function resume () {
 | 
						|
    if (!self.paused) return
 | 
						|
    self.paused = false
 | 
						|
    for (var i = 0; i < self.concurrency; i++) {
 | 
						|
      _running++
 | 
						|
      release()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  function idle () {
 | 
						|
    return _running === 0 && self.length() === 0
 | 
						|
  }
 | 
						|
 | 
						|
  function push (value, done) {
 | 
						|
    var current = cache.get()
 | 
						|
 | 
						|
    current.context = context
 | 
						|
    current.release = release
 | 
						|
    current.value = value
 | 
						|
    current.callback = done || noop
 | 
						|
    current.errorHandler = errorHandler
 | 
						|
 | 
						|
    if (_running === self.concurrency || self.paused) {
 | 
						|
      if (queueTail) {
 | 
						|
        queueTail.next = current
 | 
						|
        queueTail = current
 | 
						|
      } else {
 | 
						|
        queueHead = current
 | 
						|
        queueTail = current
 | 
						|
        self.saturated()
 | 
						|
      }
 | 
						|
    } else {
 | 
						|
      _running++
 | 
						|
      worker.call(context, current.value, current.worked)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  function unshift (value, done) {
 | 
						|
    var current = cache.get()
 | 
						|
 | 
						|
    current.context = context
 | 
						|
    current.release = release
 | 
						|
    current.value = value
 | 
						|
    current.callback = done || noop
 | 
						|
 | 
						|
    if (_running === self.concurrency || self.paused) {
 | 
						|
      if (queueHead) {
 | 
						|
        current.next = queueHead
 | 
						|
        queueHead = current
 | 
						|
      } else {
 | 
						|
        queueHead = current
 | 
						|
        queueTail = current
 | 
						|
        self.saturated()
 | 
						|
      }
 | 
						|
    } else {
 | 
						|
      _running++
 | 
						|
      worker.call(context, current.value, current.worked)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  function release (holder) {
 | 
						|
    if (holder) {
 | 
						|
      cache.release(holder)
 | 
						|
    }
 | 
						|
    var next = queueHead
 | 
						|
    if (next) {
 | 
						|
      if (!self.paused) {
 | 
						|
        if (queueTail === queueHead) {
 | 
						|
          queueTail = null
 | 
						|
        }
 | 
						|
        queueHead = next.next
 | 
						|
        next.next = null
 | 
						|
        worker.call(context, next.value, next.worked)
 | 
						|
        if (queueTail === null) {
 | 
						|
          self.empty()
 | 
						|
        }
 | 
						|
      } else {
 | 
						|
        _running--
 | 
						|
      }
 | 
						|
    } else if (--_running === 0) {
 | 
						|
      self.drain()
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  function kill () {
 | 
						|
    queueHead = null
 | 
						|
    queueTail = null
 | 
						|
    self.drain = noop
 | 
						|
  }
 | 
						|
 | 
						|
  function killAndDrain () {
 | 
						|
    queueHead = null
 | 
						|
    queueTail = null
 | 
						|
    self.drain()
 | 
						|
    self.drain = noop
 | 
						|
  }
 | 
						|
 | 
						|
  function error (handler) {
 | 
						|
    errorHandler = handler
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function noop () {}
 | 
						|
 | 
						|
function Task () {
 | 
						|
  this.value = null
 | 
						|
  this.callback = noop
 | 
						|
  this.next = null
 | 
						|
  this.release = noop
 | 
						|
  this.context = null
 | 
						|
  this.errorHandler = null
 | 
						|
 | 
						|
  var self = this
 | 
						|
 | 
						|
  this.worked = function worked (err, result) {
 | 
						|
    var callback = self.callback
 | 
						|
    var errorHandler = self.errorHandler
 | 
						|
    var val = self.value
 | 
						|
    self.value = null
 | 
						|
    self.callback = noop
 | 
						|
    if (self.errorHandler) {
 | 
						|
      errorHandler(err, val)
 | 
						|
    }
 | 
						|
    callback.call(self.context, err, result)
 | 
						|
    self.release(self)
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function queueAsPromised (context, worker, concurrency) {
 | 
						|
  if (typeof context === 'function') {
 | 
						|
    concurrency = worker
 | 
						|
    worker = context
 | 
						|
    context = null
 | 
						|
  }
 | 
						|
 | 
						|
  function asyncWrapper (arg, cb) {
 | 
						|
    worker.call(this, arg)
 | 
						|
      .then(function (res) {
 | 
						|
        cb(null, res)
 | 
						|
      }, cb)
 | 
						|
  }
 | 
						|
 | 
						|
  var queue = fastqueue(context, asyncWrapper, concurrency)
 | 
						|
 | 
						|
  var pushCb = queue.push
 | 
						|
  var unshiftCb = queue.unshift
 | 
						|
 | 
						|
  queue.push = push
 | 
						|
  queue.unshift = unshift
 | 
						|
  queue.drained = drained
 | 
						|
 | 
						|
  return queue
 | 
						|
 | 
						|
  function push (value) {
 | 
						|
    var p = new Promise(function (resolve, reject) {
 | 
						|
      pushCb(value, function (err, result) {
 | 
						|
        if (err) {
 | 
						|
          reject(err)
 | 
						|
          return
 | 
						|
        }
 | 
						|
        resolve(result)
 | 
						|
      })
 | 
						|
    })
 | 
						|
 | 
						|
    // Let's fork the promise chain to
 | 
						|
    // make the error bubble up to the user but
 | 
						|
    // not lead to a unhandledRejection
 | 
						|
    p.catch(noop)
 | 
						|
 | 
						|
    return p
 | 
						|
  }
 | 
						|
 | 
						|
  function unshift (value) {
 | 
						|
    var p = new Promise(function (resolve, reject) {
 | 
						|
      unshiftCb(value, function (err, result) {
 | 
						|
        if (err) {
 | 
						|
          reject(err)
 | 
						|
          return
 | 
						|
        }
 | 
						|
        resolve(result)
 | 
						|
      })
 | 
						|
    })
 | 
						|
 | 
						|
    // Let's fork the promise chain to
 | 
						|
    // make the error bubble up to the user but
 | 
						|
    // not lead to a unhandledRejection
 | 
						|
    p.catch(noop)
 | 
						|
 | 
						|
    return p
 | 
						|
  }
 | 
						|
 | 
						|
  function drained () {
 | 
						|
    var previousDrain = queue.drain
 | 
						|
 | 
						|
    var p = new Promise(function (resolve) {
 | 
						|
      queue.drain = function () {
 | 
						|
        previousDrain()
 | 
						|
        resolve()
 | 
						|
      }
 | 
						|
    })
 | 
						|
 | 
						|
    return p
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
module.exports = fastqueue
 | 
						|
module.exports.promise = queueAsPromised
 |