iterate-stream.js 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. module.exports = async function* iterateStream(stream) {
  2. const contents = [];
  3. stream.on('data', data => contents.push(data));
  4. let resolveStreamEndedPromise;
  5. const streamEndedPromise = new Promise(resolve => (resolveStreamEndedPromise = resolve));
  6. let ended = false;
  7. stream.on('end', () => {
  8. ended = true;
  9. resolveStreamEndedPromise();
  10. });
  11. let error = false;
  12. stream.on('error', err => {
  13. error = err;
  14. resolveStreamEndedPromise();
  15. });
  16. while (!ended || contents.length > 0) {
  17. if (contents.length === 0) {
  18. stream.resume();
  19. // eslint-disable-next-line no-await-in-loop
  20. await Promise.race([once(stream, 'data'), streamEndedPromise]);
  21. } else {
  22. stream.pause();
  23. const data = contents.shift();
  24. yield data;
  25. }
  26. if (error) throw error;
  27. }
  28. resolveStreamEndedPromise();
  29. };
  30. function once(eventEmitter, type) {
  31. // TODO: Use require('events').once when node v10 is dropped
  32. return new Promise(resolve => {
  33. let fired = false;
  34. const handler = () => {
  35. if (!fired) {
  36. fired = true;
  37. eventEmitter.removeListener(type, handler);
  38. resolve();
  39. }
  40. };
  41. eventEmitter.addListener(type, handler);
  42. });
  43. }