| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- var Stream = require('stream')
- // through
- //
- // a stream that does nothing but re-emit the input.
- // useful for aggregating a series of changing but not ending streams into one stream)
- exports = module.exports = through
- through.through = through
- //create a readable writable stream.
- function through (write, end) {
- write = write || function (data) { this.emit('data', data) }
- end = end || function () { this.emit('end') }
- var ended = false, destroyed = false
- var stream = new Stream()
- stream.readable = stream.writable = true
- stream.paused = false
- stream.write = function (data) {
- write.call(this, data)
- return !stream.paused
- }
- //this will be registered as the first 'end' listener
- //must call destroy next tick, to make sure we're after any
- //stream piped from here.
- stream.on('end', function () {
- stream.readable = false
- if(!stream.writable)
- process.nextTick(function () {
- stream.destroy()
- })
- })
- stream.end = function (data) {
- if(ended) return
- //this breaks, because pipe doesn't check writable before calling end.
- //throw new Error('cannot call end twice')
- ended = true
- if(arguments.length) stream.write(data)
- this.writable = false
- end.call(this)
- if(!this.readable)
- this.destroy()
- }
- stream.destroy = function () {
- if(destroyed) return
- destroyed = true
- ended = true
- stream.writable = stream.readable = false
- stream.emit('close')
- }
- stream.pause = function () {
- if(stream.paused) return
- stream.paused = true
- stream.emit('pause')
- }
- stream.resume = function () {
- if(stream.paused) {
- stream.paused = false
- stream.emit('drain')
- }
- }
- return stream
- }
|