You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					567 lines
				
				11 KiB
			
		
		
			
		
	
	
					567 lines
				
				11 KiB
			| 
											3 years ago
										 | 'use strict' | ||
|  | 
 | ||
|  | /* eslint-disable no-var */ | ||
|  | 
 | ||
|  | var test = require('tape') | ||
|  | var buildQueue = require('../') | ||
|  | 
 | ||
|  | test('concurrency', function (t) { | ||
|  |   t.plan(2) | ||
|  |   t.throws(buildQueue.bind(null, worker, 0)) | ||
|  |   t.doesNotThrow(buildQueue.bind(null, worker, 1)) | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     cb(null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('worker execution', function (t) { | ||
|  |   t.plan(3) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   }) | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     cb(null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('limit', function (t) { | ||
|  |   t.plan(4) | ||
|  | 
 | ||
|  |   var expected = [10, 0] | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   queue.push(10, result) | ||
|  |   queue.push(0, result) | ||
|  | 
 | ||
|  |   function result (err, arg) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(arg, expected.shift(), 'the result matches') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     setTimeout(cb, arg, null, arg) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('multiple executions', function (t) { | ||
|  |   t.plan(15) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var toExec = [1, 2, 3, 4, 5] | ||
|  |   var count = 0 | ||
|  | 
 | ||
|  |   toExec.forEach(function (task) { | ||
|  |     queue.push(task, done) | ||
|  |   }) | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, toExec[count - 1], 'the result matches') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, toExec[count], 'arg matches') | ||
|  |     count++ | ||
|  |     setImmediate(cb, null, arg) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('multiple executions, one after another', function (t) { | ||
|  |   t.plan(15) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var toExec = [1, 2, 3, 4, 5] | ||
|  |   var count = 0 | ||
|  | 
 | ||
|  |   queue.push(toExec[0], done) | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, toExec[count - 1], 'the result matches') | ||
|  |     if (count < toExec.length) { | ||
|  |       queue.push(toExec[count], done) | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, toExec[count], 'arg matches') | ||
|  |     count++ | ||
|  |     setImmediate(cb, null, arg) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('set this', function (t) { | ||
|  |   t.plan(3) | ||
|  | 
 | ||
|  |   var that = {} | ||
|  |   var queue = buildQueue(that, worker, 1) | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(this, that, 'this matches') | ||
|  |   }) | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(this, that, 'this matches') | ||
|  |     cb(null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('drain', function (t) { | ||
|  |   t.plan(4) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var worked = false | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   }) | ||
|  | 
 | ||
|  |   queue.drain = function () { | ||
|  |     t.equal(true, worked, 'drained') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     worked = true | ||
|  |     setImmediate(cb, null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('pause && resume', function (t) { | ||
|  |   t.plan(7) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var worked = false | ||
|  | 
 | ||
|  |   t.notOk(queue.paused, 'it should not be paused') | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   }) | ||
|  | 
 | ||
|  |   t.notOk(worked, 'it should be paused') | ||
|  |   t.ok(queue.paused, 'it should be paused') | ||
|  | 
 | ||
|  |   queue.resume() | ||
|  |   queue.resume() // second resume is a no-op
 | ||
|  | 
 | ||
|  |   t.notOk(queue.paused, 'it should not be paused') | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     worked = true | ||
|  |     cb(null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('pause in flight && resume', function (t) { | ||
|  |   t.plan(9) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var expected = [42, 24] | ||
|  | 
 | ||
|  |   t.notOk(queue.paused, 'it should not be paused') | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |     t.ok(queue.paused, 'it should be paused') | ||
|  |     process.nextTick(function () { queue.resume() }) | ||
|  |   }) | ||
|  | 
 | ||
|  |   queue.push(24, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |     t.notOk(queue.paused, 'it should not be paused') | ||
|  |   }) | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, expected.shift()) | ||
|  |     process.nextTick(function () { cb(null, true) }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('altering concurrency', function (t) { | ||
|  |   t.plan(7) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var count = 0 | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   queue.push(24, workDone) | ||
|  |   queue.push(24, workDone) | ||
|  | 
 | ||
|  |   queue.concurrency = 2 | ||
|  | 
 | ||
|  |   queue.resume() | ||
|  | 
 | ||
|  |   t.equal(queue.running(), 2, '2 jobs running') | ||
|  | 
 | ||
|  |   function workDone (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(0, count, 'works in parallel') | ||
|  |     setImmediate(function () { | ||
|  |       count++ | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('idle()', function (t) { | ||
|  |   t.plan(12) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   t.ok(queue.idle(), 'queue is idle') | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |     t.notOk(queue.idle(), 'queue is not idle') | ||
|  |   }) | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |     // it will go idle after executing this function
 | ||
|  |     setImmediate(function () { | ||
|  |       t.ok(queue.idle(), 'queue is now idle') | ||
|  |     }) | ||
|  |   }) | ||
|  | 
 | ||
|  |   t.notOk(queue.idle(), 'queue is not idle') | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.notOk(queue.idle(), 'queue is not idle') | ||
|  |     t.equal(arg, 42) | ||
|  |     setImmediate(cb, null, true) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('saturated', function (t) { | ||
|  |   t.plan(9) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var preworked = 0 | ||
|  |   var worked = 0 | ||
|  | 
 | ||
|  |   queue.saturated = function () { | ||
|  |     t.pass('saturated') | ||
|  |     t.equal(preworked, 1, 'started 1 task') | ||
|  |     t.equal(worked, 0, 'worked zero task') | ||
|  |   } | ||
|  | 
 | ||
|  |   queue.push(42, done) | ||
|  |   queue.push(42, done) | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     preworked++ | ||
|  |     setImmediate(function () { | ||
|  |       worked++ | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('length', function (t) { | ||
|  |   t.plan(7) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   t.equal(queue.length(), 0, 'nothing waiting') | ||
|  |   queue.push(42, done) | ||
|  |   t.equal(queue.length(), 0, 'nothing waiting') | ||
|  |   queue.push(42, done) | ||
|  |   t.equal(queue.length(), 1, 'one task waiting') | ||
|  |   queue.push(42, done) | ||
|  |   t.equal(queue.length(), 2, 'two tasks waiting') | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('getQueue', function (t) { | ||
|  |   t.plan(10) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   t.equal(queue.getQueue().length, 0, 'nothing waiting') | ||
|  |   queue.push(42, done) | ||
|  |   t.equal(queue.getQueue().length, 0, 'nothing waiting') | ||
|  |   queue.push(42, done) | ||
|  |   t.equal(queue.getQueue().length, 1, 'one task waiting') | ||
|  |   t.equal(queue.getQueue()[0], 42, 'should be equal') | ||
|  |   queue.push(43, done) | ||
|  |   t.equal(queue.getQueue().length, 2, 'two tasks waiting') | ||
|  |   t.equal(queue.getQueue()[0], 42, 'should be equal') | ||
|  |   t.equal(queue.getQueue()[1], 43, 'should be equal') | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('unshift', function (t) { | ||
|  |   t.plan(8) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var expected = [1, 2, 3, 4] | ||
|  | 
 | ||
|  |   queue.push(1, done) | ||
|  |   queue.push(4, done) | ||
|  |   queue.unshift(3, done) | ||
|  |   queue.unshift(2, done) | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(expected.shift(), arg, 'tasks come in order') | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('unshift && empty', function (t) { | ||
|  |   t.plan(2) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var completed = false | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   queue.empty = function () { | ||
|  |     t.notOk(completed, 'the task has not completed yet') | ||
|  |   } | ||
|  | 
 | ||
|  |   queue.unshift(1, done) | ||
|  | 
 | ||
|  |   queue.resume() | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     completed = true | ||
|  |     t.error(err, 'no error') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('push && empty', function (t) { | ||
|  |   t.plan(2) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var completed = false | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   queue.empty = function () { | ||
|  |     t.notOk(completed, 'the task has not completed yet') | ||
|  |   } | ||
|  | 
 | ||
|  |   queue.push(1, done) | ||
|  | 
 | ||
|  |   queue.resume() | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     completed = true | ||
|  |     t.error(err, 'no error') | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('kill', function (t) { | ||
|  |   t.plan(5) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var expected = [1] | ||
|  | 
 | ||
|  |   var predrain = queue.drain | ||
|  | 
 | ||
|  |   queue.drain = function drain () { | ||
|  |     t.fail('drain should never be called') | ||
|  |   } | ||
|  | 
 | ||
|  |   queue.push(1, done) | ||
|  |   queue.push(4, done) | ||
|  |   queue.unshift(3, done) | ||
|  |   queue.unshift(2, done) | ||
|  |   queue.kill() | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     setImmediate(function () { | ||
|  |       t.equal(queue.length(), 0, 'no queued tasks') | ||
|  |       t.equal(queue.running(), 0, 'no running tasks') | ||
|  |       t.equal(queue.drain, predrain, 'drain is back to default') | ||
|  |     }) | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(expected.shift(), arg, 'tasks come in order') | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('killAndDrain', function (t) { | ||
|  |   t.plan(6) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var expected = [1] | ||
|  | 
 | ||
|  |   var predrain = queue.drain | ||
|  | 
 | ||
|  |   queue.drain = function drain () { | ||
|  |     t.pass('drain has been called') | ||
|  |   } | ||
|  | 
 | ||
|  |   queue.push(1, done) | ||
|  |   queue.push(4, done) | ||
|  |   queue.unshift(3, done) | ||
|  |   queue.unshift(2, done) | ||
|  |   queue.killAndDrain() | ||
|  | 
 | ||
|  |   function done (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     setImmediate(function () { | ||
|  |       t.equal(queue.length(), 0, 'no queued tasks') | ||
|  |       t.equal(queue.running(), 0, 'no running tasks') | ||
|  |       t.equal(queue.drain, predrain, 'drain is back to default') | ||
|  |     }) | ||
|  |   } | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(expected.shift(), arg, 'tasks come in order') | ||
|  |     setImmediate(function () { | ||
|  |       cb(null, true) | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('pause && idle', function (t) { | ||
|  |   t.plan(11) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  |   var worked = false | ||
|  | 
 | ||
|  |   t.notOk(queue.paused, 'it should not be paused') | ||
|  |   t.ok(queue.idle(), 'should be idle') | ||
|  | 
 | ||
|  |   queue.pause() | ||
|  | 
 | ||
|  |   queue.push(42, function (err, result) { | ||
|  |     t.error(err, 'no error') | ||
|  |     t.equal(result, true, 'result matches') | ||
|  |   }) | ||
|  | 
 | ||
|  |   t.notOk(worked, 'it should be paused') | ||
|  |   t.ok(queue.paused, 'it should be paused') | ||
|  |   t.notOk(queue.idle(), 'should not be idle') | ||
|  | 
 | ||
|  |   queue.resume() | ||
|  | 
 | ||
|  |   t.notOk(queue.paused, 'it should not be paused') | ||
|  |   t.notOk(queue.idle(), 'it should not be idle') | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     worked = true | ||
|  |     process.nextTick(cb.bind(null, null, true)) | ||
|  |     process.nextTick(function () { | ||
|  |       t.ok(queue.idle(), 'is should be idle') | ||
|  |     }) | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('push without cb', function (t) { | ||
|  |   t.plan(1) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   queue.push(42) | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     cb() | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('unshift without cb', function (t) { | ||
|  |   t.plan(1) | ||
|  | 
 | ||
|  |   var queue = buildQueue(worker, 1) | ||
|  | 
 | ||
|  |   queue.unshift(42) | ||
|  | 
 | ||
|  |   function worker (arg, cb) { | ||
|  |     t.equal(arg, 42) | ||
|  |     cb() | ||
|  |   } | ||
|  | }) | ||
|  | 
 | ||
|  | test('push with worker throwing error', function (t) { | ||
|  |   t.plan(5) | ||
|  |   var q = buildQueue(function (task, cb) { | ||
|  |     cb(new Error('test error'), null) | ||
|  |   }, 1) | ||
|  |   q.error(function (err, task) { | ||
|  |     t.ok(err instanceof Error, 'global error handler should catch the error') | ||
|  |     t.match(err.message, /test error/, 'error message should be "test error"') | ||
|  |     t.equal(task, 42, 'The task executed should be passed') | ||
|  |   }) | ||
|  |   q.push(42, function (err) { | ||
|  |     t.ok(err instanceof Error, 'push callback should catch the error') | ||
|  |     t.match(err.message, /test error/, 'error message should be "test error"') | ||
|  |   }) | ||
|  | }) |