queue.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. 'use strict';
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.default = queue;
  6. var _baseIndexOf = require('lodash/_baseIndexOf');
  7. var _baseIndexOf2 = _interopRequireDefault(_baseIndexOf);
  8. var _isArray = require('lodash/isArray');
  9. var _isArray2 = _interopRequireDefault(_isArray);
  10. var _noop = require('lodash/noop');
  11. var _noop2 = _interopRequireDefault(_noop);
  12. var _onlyOnce = require('./onlyOnce');
  13. var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
  14. var _setImmediate = require('./setImmediate');
  15. var _setImmediate2 = _interopRequireDefault(_setImmediate);
  16. var _DoublyLinkedList = require('./DoublyLinkedList');
  17. var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
  18. var _wrapAsync = require('./wrapAsync');
  19. var _wrapAsync2 = _interopRequireDefault(_wrapAsync);
  20. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  21. function queue(worker, concurrency, payload) {
  22. if (concurrency == null) {
  23. concurrency = 1;
  24. } else if (concurrency === 0) {
  25. throw new Error('Concurrency must not be zero');
  26. }
  27. var _worker = (0, _wrapAsync2.default)(worker);
  28. var numRunning = 0;
  29. var workersList = [];
  30. function _insert(data, insertAtFront, callback) {
  31. if (callback != null && typeof callback !== 'function') {
  32. throw new Error('task callback must be a function');
  33. }
  34. q.started = true;
  35. if (!(0, _isArray2.default)(data)) {
  36. data = [data];
  37. }
  38. if (data.length === 0 && q.idle()) {
  39. // call drain immediately if there are no tasks
  40. return (0, _setImmediate2.default)(function () {
  41. q.drain();
  42. });
  43. }
  44. for (var i = 0, l = data.length; i < l; i++) {
  45. var item = {
  46. data: data[i],
  47. callback: callback || _noop2.default
  48. };
  49. if (insertAtFront) {
  50. q._tasks.unshift(item);
  51. } else {
  52. q._tasks.push(item);
  53. }
  54. }
  55. (0, _setImmediate2.default)(q.process);
  56. }
  57. function _next(tasks) {
  58. return function (err) {
  59. numRunning -= 1;
  60. for (var i = 0, l = tasks.length; i < l; i++) {
  61. var task = tasks[i];
  62. var index = (0, _baseIndexOf2.default)(workersList, task, 0);
  63. if (index >= 0) {
  64. workersList.splice(index, 1);
  65. }
  66. task.callback.apply(task, arguments);
  67. if (err != null) {
  68. q.error(err, task.data);
  69. }
  70. }
  71. if (numRunning <= q.concurrency - q.buffer) {
  72. q.unsaturated();
  73. }
  74. if (q.idle()) {
  75. q.drain();
  76. }
  77. q.process();
  78. };
  79. }
  80. var isProcessing = false;
  81. var q = {
  82. _tasks: new _DoublyLinkedList2.default(),
  83. concurrency: concurrency,
  84. payload: payload,
  85. saturated: _noop2.default,
  86. unsaturated: _noop2.default,
  87. buffer: concurrency / 4,
  88. empty: _noop2.default,
  89. drain: _noop2.default,
  90. error: _noop2.default,
  91. started: false,
  92. paused: false,
  93. push: function (data, callback) {
  94. _insert(data, false, callback);
  95. },
  96. kill: function () {
  97. q.drain = _noop2.default;
  98. q._tasks.empty();
  99. },
  100. unshift: function (data, callback) {
  101. _insert(data, true, callback);
  102. },
  103. remove: function (testFn) {
  104. q._tasks.remove(testFn);
  105. },
  106. process: function () {
  107. // Avoid trying to start too many processing operations. This can occur
  108. // when callbacks resolve synchronously (#1267).
  109. if (isProcessing) {
  110. return;
  111. }
  112. isProcessing = true;
  113. while (!q.paused && numRunning < q.concurrency && q._tasks.length) {
  114. var tasks = [],
  115. data = [];
  116. var l = q._tasks.length;
  117. if (q.payload) l = Math.min(l, q.payload);
  118. for (var i = 0; i < l; i++) {
  119. var node = q._tasks.shift();
  120. tasks.push(node);
  121. workersList.push(node);
  122. data.push(node.data);
  123. }
  124. numRunning += 1;
  125. if (q._tasks.length === 0) {
  126. q.empty();
  127. }
  128. if (numRunning === q.concurrency) {
  129. q.saturated();
  130. }
  131. var cb = (0, _onlyOnce2.default)(_next(tasks));
  132. _worker(data, cb);
  133. }
  134. isProcessing = false;
  135. },
  136. length: function () {
  137. return q._tasks.length;
  138. },
  139. running: function () {
  140. return numRunning;
  141. },
  142. workersList: function () {
  143. return workersList;
  144. },
  145. idle: function () {
  146. return q._tasks.length + numRunning === 0;
  147. },
  148. pause: function () {
  149. q.paused = true;
  150. },
  151. resume: function () {
  152. if (q.paused === false) {
  153. return;
  154. }
  155. q.paused = false;
  156. (0, _setImmediate2.default)(q.process);
  157. }
  158. };
  159. return q;
  160. }
  161. module.exports = exports['default'];