Farm.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. 'use strict';
  2. Object.defineProperty(exports, '__esModule', {
  3. value: true
  4. });
  5. exports.default = void 0;
  6. var _FifoQueue = _interopRequireDefault(require('./FifoQueue'));
  7. var _types = require('./types');
  8. function _interopRequireDefault(obj) {
  9. return obj && obj.__esModule ? obj : {default: obj};
  10. }
  11. /**
  12. * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  13. *
  14. * This source code is licensed under the MIT license found in the
  15. * LICENSE file in the root directory of this source tree.
  16. */
  17. class Farm {
  18. _computeWorkerKey;
  19. _workerSchedulingPolicy;
  20. _cacheKeys = Object.create(null);
  21. _locks = [];
  22. _offset = 0;
  23. _taskQueue;
  24. constructor(_numOfWorkers, _callback, options = {}) {
  25. var _options$workerSchedu, _options$taskQueue;
  26. this._numOfWorkers = _numOfWorkers;
  27. this._callback = _callback;
  28. this._computeWorkerKey = options.computeWorkerKey;
  29. this._workerSchedulingPolicy =
  30. (_options$workerSchedu = options.workerSchedulingPolicy) !== null &&
  31. _options$workerSchedu !== void 0
  32. ? _options$workerSchedu
  33. : 'round-robin';
  34. this._taskQueue =
  35. (_options$taskQueue = options.taskQueue) !== null &&
  36. _options$taskQueue !== void 0
  37. ? _options$taskQueue
  38. : new _FifoQueue.default();
  39. }
  40. doWork(method, ...args) {
  41. const customMessageListeners = new Set();
  42. const addCustomMessageListener = listener => {
  43. customMessageListeners.add(listener);
  44. return () => {
  45. customMessageListeners.delete(listener);
  46. };
  47. };
  48. const onCustomMessage = message => {
  49. customMessageListeners.forEach(listener => listener(message));
  50. };
  51. const promise = new Promise( // Bind args to this function so it won't reference to the parent scope.
  52. // This prevents a memory leak in v8, because otherwise the function will
  53. // retain args for the closure.
  54. ((args, resolve, reject) => {
  55. const computeWorkerKey = this._computeWorkerKey;
  56. const request = [_types.CHILD_MESSAGE_CALL, false, method, args];
  57. let worker = null;
  58. let hash = null;
  59. if (computeWorkerKey) {
  60. hash = computeWorkerKey.call(this, method, ...args);
  61. worker = hash == null ? null : this._cacheKeys[hash];
  62. }
  63. const onStart = worker => {
  64. if (hash != null) {
  65. this._cacheKeys[hash] = worker;
  66. }
  67. };
  68. const onEnd = (error, result) => {
  69. customMessageListeners.clear();
  70. if (error) {
  71. reject(error);
  72. } else {
  73. resolve(result);
  74. }
  75. };
  76. const task = {
  77. onCustomMessage,
  78. onEnd,
  79. onStart,
  80. request
  81. };
  82. if (worker) {
  83. this._taskQueue.enqueue(task, worker.getWorkerId());
  84. this._process(worker.getWorkerId());
  85. } else {
  86. this._push(task);
  87. }
  88. }).bind(null, args)
  89. );
  90. promise.UNSTABLE_onCustomMessage = addCustomMessageListener;
  91. return promise;
  92. }
  93. _process(workerId) {
  94. if (this._isLocked(workerId)) {
  95. return this;
  96. }
  97. const task = this._taskQueue.dequeue(workerId);
  98. if (!task) {
  99. return this;
  100. }
  101. if (task.request[1]) {
  102. throw new Error('Queue implementation returned processed task');
  103. } // Reference the task object outside so it won't be retained by onEnd,
  104. // and other properties of the task object, such as task.request can be
  105. // garbage collected.
  106. let taskOnEnd = task.onEnd;
  107. const onEnd = (error, result) => {
  108. if (taskOnEnd) {
  109. taskOnEnd(error, result);
  110. }
  111. taskOnEnd = null;
  112. this._unlock(workerId);
  113. this._process(workerId);
  114. };
  115. task.request[1] = true;
  116. this._lock(workerId);
  117. this._callback(
  118. workerId,
  119. task.request,
  120. task.onStart,
  121. onEnd,
  122. task.onCustomMessage
  123. );
  124. return this;
  125. }
  126. _push(task) {
  127. this._taskQueue.enqueue(task);
  128. const offset = this._getNextWorkerOffset();
  129. for (let i = 0; i < this._numOfWorkers; i++) {
  130. this._process((offset + i) % this._numOfWorkers);
  131. if (task.request[1]) {
  132. break;
  133. }
  134. }
  135. return this;
  136. }
  137. _getNextWorkerOffset() {
  138. switch (this._workerSchedulingPolicy) {
  139. case 'in-order':
  140. return 0;
  141. case 'round-robin':
  142. return this._offset++;
  143. }
  144. }
  145. _lock(workerId) {
  146. this._locks[workerId] = true;
  147. }
  148. _unlock(workerId) {
  149. this._locks[workerId] = false;
  150. }
  151. _isLocked(workerId) {
  152. return this._locks[workerId];
  153. }
  154. }
  155. exports.default = Farm;