stuttered-pipe.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. const events = require('events');
  2. // =============================================================================
  3. // StutteredPipe - Used to slow down streaming so GC can get a look in
  4. class StutteredPipe extends events.EventEmitter {
  5. constructor(readable, writable, options) {
  6. super();
  7. options = options || {};
  8. this.readable = readable;
  9. this.writable = writable;
  10. this.bufSize = options.bufSize || 16384;
  11. this.autoPause = options.autoPause || false;
  12. this.paused = false;
  13. this.eod = false;
  14. this.scheduled = null;
  15. readable.on('end', () => {
  16. this.eod = true;
  17. writable.end();
  18. });
  19. // need to have some way to communicate speed of stream
  20. // back from the consumer
  21. readable.on('readable', () => {
  22. if (!this.paused) {
  23. this.resume();
  24. }
  25. });
  26. this._schedule();
  27. }
  28. pause() {
  29. this.paused = true;
  30. }
  31. resume() {
  32. if (!this.eod) {
  33. if (this.scheduled !== null) {
  34. clearImmediate(this.scheduled);
  35. }
  36. this._schedule();
  37. }
  38. }
  39. _schedule() {
  40. this.scheduled = setImmediate(() => {
  41. this.scheduled = null;
  42. if (!this.eod && !this.paused) {
  43. const data = this.readable.read(this.bufSize);
  44. if (data && data.length) {
  45. this.writable.write(data);
  46. if (!this.paused && !this.autoPause) {
  47. this._schedule();
  48. }
  49. } else if (!this.paused) {
  50. this._schedule();
  51. }
  52. }
  53. });
  54. }
  55. }
  56. module.exports = StutteredPipe;