stuttered-pipe.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. "use strict";
  2. const events = require('events');
  3. // =============================================================================
  4. // StutteredPipe - Used to slow down streaming so GC can get a look in
  5. class StutteredPipe extends events.EventEmitter {
  6. constructor(readable, writable, options) {
  7. super();
  8. options = options || {};
  9. this.readable = readable;
  10. this.writable = writable;
  11. this.bufSize = options.bufSize || 16384;
  12. this.autoPause = options.autoPause || false;
  13. this.paused = false;
  14. this.eod = false;
  15. this.scheduled = null;
  16. readable.on('end', () => {
  17. this.eod = true;
  18. writable.end();
  19. });
  20. // need to have some way to communicate speed of stream
  21. // back from the consumer
  22. readable.on('readable', () => {
  23. if (!this.paused) {
  24. this.resume();
  25. }
  26. });
  27. this._schedule();
  28. }
  29. pause() {
  30. this.paused = true;
  31. }
  32. resume() {
  33. if (!this.eod) {
  34. if (this.scheduled !== null) {
  35. clearImmediate(this.scheduled);
  36. }
  37. this._schedule();
  38. }
  39. }
  40. _schedule() {
  41. this.scheduled = setImmediate(() => {
  42. this.scheduled = null;
  43. if (!this.eod && !this.paused) {
  44. const data = this.readable.read(this.bufSize);
  45. if (data && data.length) {
  46. this.writable.write(data);
  47. if (!this.paused && !this.autoPause) {
  48. this._schedule();
  49. }
  50. } else if (!this.paused) {
  51. this._schedule();
  52. }
  53. }
  54. });
  55. }
  56. }
  57. module.exports = StutteredPipe;
  58. //# sourceMappingURL=stuttered-pipe.js.map