var Stream = require('stream') // through // // a stream that does nothing but re-emit the input. // useful for aggregating a series of changing but not ending streams into one stream) exports = module.exports = through through.through = through //create a readable writable stream. function through (write, end) { write = write || function (data) { this.emit('data', data) } end = end || function () { this.emit('end') } var ended = false, destroyed = false var stream = new Stream() stream.readable = stream.writable = true stream.paused = false stream.write = function (data) { write.call(this, data) return !stream.paused } //this will be registered as the first 'end' listener //must call destroy next tick, to make sure we're after any //stream piped from here. stream.on('end', function () { stream.readable = false if(!stream.writable) process.nextTick(function () { stream.destroy() }) }) stream.end = function (data) { if(ended) return //this breaks, because pipe doesn't check writable before calling end. //throw new Error('cannot call end twice') ended = true if(arguments.length) stream.write(data) this.writable = false end.call(this) if(!this.readable) this.destroy() } stream.destroy = function () { if(destroyed) return destroyed = true ended = true stream.writable = stream.readable = false stream.emit('close') } stream.pause = function () { if(stream.paused) return stream.paused = true stream.emit('pause') } stream.resume = function () { if(stream.paused) { stream.paused = false stream.emit('drain') } } return stream }