index.js 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. var Stream = require('stream')
  2. // through
  3. //
  4. // a stream that does nothing but re-emit the input.
  5. // useful for aggregating a series of changing but not ending streams into one stream)
  6. exports = module.exports = through
  7. through.through = through
  8. //create a readable writable stream.
  9. function through (write, end) {
  10. write = write || function (data) { this.emit('data', data) }
  11. end = end || function () { this.emit('end') }
  12. var ended = false, destroyed = false
  13. var stream = new Stream()
  14. stream.readable = stream.writable = true
  15. stream.paused = false
  16. stream.write = function (data) {
  17. write.call(this, data)
  18. return !stream.paused
  19. }
  20. //this will be registered as the first 'end' listener
  21. //must call destroy next tick, to make sure we're after any
  22. //stream piped from here.
  23. stream.on('end', function () {
  24. stream.readable = false
  25. if(!stream.writable)
  26. process.nextTick(function () {
  27. stream.destroy()
  28. })
  29. })
  30. stream.end = function (data) {
  31. if(ended) return
  32. //this breaks, because pipe doesn't check writable before calling end.
  33. //throw new Error('cannot call end twice')
  34. ended = true
  35. if(arguments.length) stream.write(data)
  36. this.writable = false
  37. end.call(this)
  38. if(!this.readable)
  39. this.destroy()
  40. }
  41. stream.destroy = function () {
  42. if(destroyed) return
  43. destroyed = true
  44. ended = true
  45. stream.writable = stream.readable = false
  46. stream.emit('close')
  47. }
  48. stream.pause = function () {
  49. if(stream.paused) return
  50. stream.paused = true
  51. stream.emit('pause')
  52. }
  53. stream.resume = function () {
  54. if(stream.paused) {
  55. stream.paused = false
  56. stream.emit('drain')
  57. }
  58. }
  59. return stream
  60. }