queue.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. 'use strict'
  2. var reusify = require('reusify')
  3. function fastqueue (context, worker, concurrency) {
  4. if (typeof context === 'function') {
  5. concurrency = worker
  6. worker = context
  7. context = null
  8. }
  9. var cache = reusify(Task)
  10. var queueHead = null
  11. var queueTail = null
  12. var _running = 0
  13. var self = {
  14. push: push,
  15. drain: noop,
  16. saturated: noop,
  17. pause: pause,
  18. paused: false,
  19. concurrency: concurrency,
  20. running: running,
  21. resume: resume,
  22. idle: idle,
  23. length: length,
  24. getQueue: getQueue,
  25. unshift: unshift,
  26. empty: noop,
  27. kill: kill,
  28. killAndDrain: killAndDrain
  29. }
  30. return self
  31. function running () {
  32. return _running
  33. }
  34. function pause () {
  35. self.paused = true
  36. }
  37. function length () {
  38. var current = queueHead
  39. var counter = 0
  40. while (current) {
  41. current = current.next
  42. counter++
  43. }
  44. return counter
  45. }
  46. function getQueue () {
  47. var current = queueHead
  48. var tasks = []
  49. while (current) {
  50. tasks.push(current.value)
  51. current = current.next
  52. }
  53. return tasks
  54. }
  55. function resume () {
  56. if (!self.paused) return
  57. self.paused = false
  58. for (var i = 0; i < self.concurrency; i++) {
  59. _running++
  60. release()
  61. }
  62. }
  63. function idle () {
  64. return _running === 0 && self.length() === 0
  65. }
  66. function push (value, done) {
  67. var current = cache.get()
  68. current.context = context
  69. current.release = release
  70. current.value = value
  71. current.callback = done || noop
  72. if (_running === self.concurrency || self.paused) {
  73. if (queueTail) {
  74. queueTail.next = current
  75. queueTail = current
  76. } else {
  77. queueHead = current
  78. queueTail = current
  79. self.saturated()
  80. }
  81. } else {
  82. _running++
  83. worker.call(context, current.value, current.worked)
  84. }
  85. }
  86. function unshift (value, done) {
  87. var current = cache.get()
  88. current.context = context
  89. current.release = release
  90. current.value = value
  91. current.callback = done || noop
  92. if (_running === self.concurrency || self.paused) {
  93. if (queueHead) {
  94. current.next = queueHead
  95. queueHead = current
  96. } else {
  97. queueHead = current
  98. queueTail = current
  99. self.saturated()
  100. }
  101. } else {
  102. _running++
  103. worker.call(context, current.value, current.worked)
  104. }
  105. }
  106. function release (holder) {
  107. if (holder) {
  108. cache.release(holder)
  109. }
  110. var next = queueHead
  111. if (next) {
  112. if (!self.paused) {
  113. if (queueTail === queueHead) {
  114. queueTail = null
  115. }
  116. queueHead = next.next
  117. next.next = null
  118. worker.call(context, next.value, next.worked)
  119. if (queueTail === null) {
  120. self.empty()
  121. }
  122. } else {
  123. _running--
  124. }
  125. } else if (--_running === 0) {
  126. self.drain()
  127. }
  128. }
  129. function kill () {
  130. queueHead = null
  131. queueTail = null
  132. self.drain = noop
  133. }
  134. function killAndDrain () {
  135. queueHead = null
  136. queueTail = null
  137. self.drain()
  138. self.drain = noop
  139. }
  140. }
  141. function noop () {}
  142. function Task () {
  143. this.value = null
  144. this.callback = noop
  145. this.next = null
  146. this.release = noop
  147. this.context = null
  148. var self = this
  149. this.worked = function worked (err, result) {
  150. var callback = self.callback
  151. self.value = null
  152. self.callback = noop
  153. callback.call(self.context, err, result)
  154. self.release(self)
  155. }
  156. }
  157. module.exports = fastqueue