| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- module.exports = async function* iterateStream(stream) {
- const contents = [];
- stream.on('data', data => contents.push(data));
- let resolveStreamEndedPromise;
- const streamEndedPromise = new Promise(resolve => (resolveStreamEndedPromise = resolve));
- let ended = false;
- stream.on('end', () => {
- ended = true;
- resolveStreamEndedPromise();
- });
- let error = false;
- stream.on('error', err => {
- error = err;
- resolveStreamEndedPromise();
- });
- while (!ended || contents.length > 0) {
- if (contents.length === 0) {
- stream.resume();
- // eslint-disable-next-line no-await-in-loop
- await Promise.race([once(stream, 'data'), streamEndedPromise]);
- } else {
- stream.pause();
- const data = contents.shift();
- yield data;
- }
- if (error) throw error;
- }
- resolveStreamEndedPromise();
- };
- function once(eventEmitter, type) {
- // TODO: Use require('events').once when node v10 is dropped
- return new Promise(resolve => {
- let fired = false;
- const handler = () => {
- if (!fired) {
- fired = true;
- eventEmitter.removeListener(type, handler);
- resolve();
- }
- };
- eventEmitter.addListener(type, handler);
- });
- }
|