You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							204 lines
						
					
					
						
							5.6 KiB
						
					
					
				
			
		
		
	
	
							204 lines
						
					
					
						
							5.6 KiB
						
					
					
				'use strict';
 | 
						|
 | 
						|
Object.defineProperty(exports, "__esModule", {
 | 
						|
    value: true
 | 
						|
});
 | 
						|
exports.default = queue;
 | 
						|
 | 
						|
var _baseIndexOf = require('lodash/_baseIndexOf');
 | 
						|
 | 
						|
var _baseIndexOf2 = _interopRequireDefault(_baseIndexOf);
 | 
						|
 | 
						|
var _isArray = require('lodash/isArray');
 | 
						|
 | 
						|
var _isArray2 = _interopRequireDefault(_isArray);
 | 
						|
 | 
						|
var _noop = require('lodash/noop');
 | 
						|
 | 
						|
var _noop2 = _interopRequireDefault(_noop);
 | 
						|
 | 
						|
var _onlyOnce = require('./onlyOnce');
 | 
						|
 | 
						|
var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
 | 
						|
 | 
						|
var _setImmediate = require('./setImmediate');
 | 
						|
 | 
						|
var _setImmediate2 = _interopRequireDefault(_setImmediate);
 | 
						|
 | 
						|
var _DoublyLinkedList = require('./DoublyLinkedList');
 | 
						|
 | 
						|
var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
 | 
						|
 | 
						|
var _wrapAsync = require('./wrapAsync');
 | 
						|
 | 
						|
var _wrapAsync2 = _interopRequireDefault(_wrapAsync);
 | 
						|
 | 
						|
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
 | 
						|
 | 
						|
function queue(worker, concurrency, payload) {
 | 
						|
    if (concurrency == null) {
 | 
						|
        concurrency = 1;
 | 
						|
    } else if (concurrency === 0) {
 | 
						|
        throw new Error('Concurrency must not be zero');
 | 
						|
    }
 | 
						|
 | 
						|
    var _worker = (0, _wrapAsync2.default)(worker);
 | 
						|
    var numRunning = 0;
 | 
						|
    var workersList = [];
 | 
						|
 | 
						|
    var processingScheduled = false;
 | 
						|
    function _insert(data, insertAtFront, callback) {
 | 
						|
        if (callback != null && typeof callback !== 'function') {
 | 
						|
            throw new Error('task callback must be a function');
 | 
						|
        }
 | 
						|
        q.started = true;
 | 
						|
        if (!(0, _isArray2.default)(data)) {
 | 
						|
            data = [data];
 | 
						|
        }
 | 
						|
        if (data.length === 0 && q.idle()) {
 | 
						|
            // call drain immediately if there are no tasks
 | 
						|
            return (0, _setImmediate2.default)(function () {
 | 
						|
                q.drain();
 | 
						|
            });
 | 
						|
        }
 | 
						|
 | 
						|
        for (var i = 0, l = data.length; i < l; i++) {
 | 
						|
            var item = {
 | 
						|
                data: data[i],
 | 
						|
                callback: callback || _noop2.default
 | 
						|
            };
 | 
						|
 | 
						|
            if (insertAtFront) {
 | 
						|
                q._tasks.unshift(item);
 | 
						|
            } else {
 | 
						|
                q._tasks.push(item);
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        if (!processingScheduled) {
 | 
						|
            processingScheduled = true;
 | 
						|
            (0, _setImmediate2.default)(function () {
 | 
						|
                processingScheduled = false;
 | 
						|
                q.process();
 | 
						|
            });
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function _next(tasks) {
 | 
						|
        return function (err) {
 | 
						|
            numRunning -= 1;
 | 
						|
 | 
						|
            for (var i = 0, l = tasks.length; i < l; i++) {
 | 
						|
                var task = tasks[i];
 | 
						|
 | 
						|
                var index = (0, _baseIndexOf2.default)(workersList, task, 0);
 | 
						|
                if (index === 0) {
 | 
						|
                    workersList.shift();
 | 
						|
                } else if (index > 0) {
 | 
						|
                    workersList.splice(index, 1);
 | 
						|
                }
 | 
						|
 | 
						|
                task.callback.apply(task, arguments);
 | 
						|
 | 
						|
                if (err != null) {
 | 
						|
                    q.error(err, task.data);
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
            if (numRunning <= q.concurrency - q.buffer) {
 | 
						|
                q.unsaturated();
 | 
						|
            }
 | 
						|
 | 
						|
            if (q.idle()) {
 | 
						|
                q.drain();
 | 
						|
            }
 | 
						|
            q.process();
 | 
						|
        };
 | 
						|
    }
 | 
						|
 | 
						|
    var isProcessing = false;
 | 
						|
    var q = {
 | 
						|
        _tasks: new _DoublyLinkedList2.default(),
 | 
						|
        concurrency: concurrency,
 | 
						|
        payload: payload,
 | 
						|
        saturated: _noop2.default,
 | 
						|
        unsaturated: _noop2.default,
 | 
						|
        buffer: concurrency / 4,
 | 
						|
        empty: _noop2.default,
 | 
						|
        drain: _noop2.default,
 | 
						|
        error: _noop2.default,
 | 
						|
        started: false,
 | 
						|
        paused: false,
 | 
						|
        push: function (data, callback) {
 | 
						|
            _insert(data, false, callback);
 | 
						|
        },
 | 
						|
        kill: function () {
 | 
						|
            q.drain = _noop2.default;
 | 
						|
            q._tasks.empty();
 | 
						|
        },
 | 
						|
        unshift: function (data, callback) {
 | 
						|
            _insert(data, true, callback);
 | 
						|
        },
 | 
						|
        remove: function (testFn) {
 | 
						|
            q._tasks.remove(testFn);
 | 
						|
        },
 | 
						|
        process: function () {
 | 
						|
            // Avoid trying to start too many processing operations. This can occur
 | 
						|
            // when callbacks resolve synchronously (#1267).
 | 
						|
            if (isProcessing) {
 | 
						|
                return;
 | 
						|
            }
 | 
						|
            isProcessing = true;
 | 
						|
            while (!q.paused && numRunning < q.concurrency && q._tasks.length) {
 | 
						|
                var tasks = [],
 | 
						|
                    data = [];
 | 
						|
                var l = q._tasks.length;
 | 
						|
                if (q.payload) l = Math.min(l, q.payload);
 | 
						|
                for (var i = 0; i < l; i++) {
 | 
						|
                    var node = q._tasks.shift();
 | 
						|
                    tasks.push(node);
 | 
						|
                    workersList.push(node);
 | 
						|
                    data.push(node.data);
 | 
						|
                }
 | 
						|
 | 
						|
                numRunning += 1;
 | 
						|
 | 
						|
                if (q._tasks.length === 0) {
 | 
						|
                    q.empty();
 | 
						|
                }
 | 
						|
 | 
						|
                if (numRunning === q.concurrency) {
 | 
						|
                    q.saturated();
 | 
						|
                }
 | 
						|
 | 
						|
                var cb = (0, _onlyOnce2.default)(_next(tasks));
 | 
						|
                _worker(data, cb);
 | 
						|
            }
 | 
						|
            isProcessing = false;
 | 
						|
        },
 | 
						|
        length: function () {
 | 
						|
            return q._tasks.length;
 | 
						|
        },
 | 
						|
        running: function () {
 | 
						|
            return numRunning;
 | 
						|
        },
 | 
						|
        workersList: function () {
 | 
						|
            return workersList;
 | 
						|
        },
 | 
						|
        idle: function () {
 | 
						|
            return q._tasks.length + numRunning === 0;
 | 
						|
        },
 | 
						|
        pause: function () {
 | 
						|
            q.paused = true;
 | 
						|
        },
 | 
						|
        resume: function () {
 | 
						|
            if (q.paused === false) {
 | 
						|
                return;
 | 
						|
            }
 | 
						|
            q.paused = false;
 | 
						|
            (0, _setImmediate2.default)(q.process);
 | 
						|
        }
 | 
						|
    };
 | 
						|
    return q;
 | 
						|
}
 | 
						|
module.exports = exports['default']; |