BaseWorkerPool.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. 'use strict';
  2. Object.defineProperty(exports, '__esModule', {
  3. value: true
  4. });
  5. exports.default = void 0;
  6. function _mergeStream() {
  7. const data = _interopRequireDefault(require('merge-stream'));
  8. _mergeStream = function () {
  9. return data;
  10. };
  11. return data;
  12. }
  13. var _types = require('../types');
  14. function _interopRequireDefault(obj) {
  15. return obj && obj.__esModule ? obj : {default: obj};
  16. }
  17. /**
  18. * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  19. *
  20. * This source code is licensed under the MIT license found in the
  21. * LICENSE file in the root directory of this source tree.
  22. */
  23. // How long to wait for the child process to terminate
  24. // after CHILD_MESSAGE_END before sending force exiting.
  25. const FORCE_EXIT_DELAY = 500;
  26. /* istanbul ignore next */
  27. // eslint-disable-next-line @typescript-eslint/no-empty-function
  28. const emptyMethod = () => {};
  29. class BaseWorkerPool {
  30. _stderr;
  31. _stdout;
  32. _options;
  33. _workers;
  34. constructor(workerPath, options) {
  35. this._options = options;
  36. this._workers = new Array(options.numWorkers);
  37. const stdout = (0, _mergeStream().default)();
  38. const stderr = (0, _mergeStream().default)();
  39. const {forkOptions, maxRetries, resourceLimits, setupArgs} = options;
  40. for (let i = 0; i < options.numWorkers; i++) {
  41. const workerOptions = {
  42. forkOptions,
  43. maxRetries,
  44. resourceLimits,
  45. setupArgs,
  46. workerId: i,
  47. workerPath
  48. };
  49. const worker = this.createWorker(workerOptions);
  50. const workerStdout = worker.getStdout();
  51. const workerStderr = worker.getStderr();
  52. if (workerStdout) {
  53. stdout.add(workerStdout);
  54. }
  55. if (workerStderr) {
  56. stderr.add(workerStderr);
  57. }
  58. this._workers[i] = worker;
  59. }
  60. this._stdout = stdout;
  61. this._stderr = stderr;
  62. }
  63. getStderr() {
  64. return this._stderr;
  65. }
  66. getStdout() {
  67. return this._stdout;
  68. }
  69. getWorkers() {
  70. return this._workers;
  71. }
  72. getWorkerById(workerId) {
  73. return this._workers[workerId];
  74. }
  75. createWorker(_workerOptions) {
  76. throw Error('Missing method createWorker in WorkerPool');
  77. }
  78. async end() {
  79. // We do not cache the request object here. If so, it would only be only
  80. // processed by one of the workers, and we want them all to close.
  81. const workerExitPromises = this._workers.map(async worker => {
  82. worker.send(
  83. [_types.CHILD_MESSAGE_END, false],
  84. emptyMethod,
  85. emptyMethod,
  86. emptyMethod
  87. ); // Schedule a force exit in case worker fails to exit gracefully so
  88. // await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY
  89. let forceExited = false;
  90. const forceExitTimeout = setTimeout(() => {
  91. worker.forceExit();
  92. forceExited = true;
  93. }, FORCE_EXIT_DELAY);
  94. await worker.waitForExit(); // Worker ideally exited gracefully, don't send force exit then
  95. clearTimeout(forceExitTimeout);
  96. return forceExited;
  97. });
  98. const workerExits = await Promise.all(workerExitPromises);
  99. return workerExits.reduce(
  100. (result, forceExited) => ({
  101. forceExited: result.forceExited || forceExited
  102. }),
  103. {
  104. forceExited: false
  105. }
  106. );
  107. }
  108. }
  109. exports.default = BaseWorkerPool;