PullStream.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. var Stream = require('stream');
  2. var Promise = require('bluebird');
  3. var util = require('util');
  4. var Buffer = require('./Buffer');
  5. var strFunction = 'function';
  6. // Backwards compatibility for node versions < 8
  7. if (!Stream.Writable || !Stream.Writable.prototype.destroy)
  8. Stream = require('readable-stream');
  9. function PullStream() {
  10. if (!(this instanceof PullStream))
  11. return new PullStream();
  12. Stream.Duplex.call(this,{decodeStrings:false, objectMode:true});
  13. this.buffer = Buffer.from('');
  14. var self = this;
  15. self.on('finish',function() {
  16. self.finished = true;
  17. self.emit('chunk',false);
  18. });
  19. }
  20. util.inherits(PullStream,Stream.Duplex);
  21. PullStream.prototype._write = function(chunk,e,cb) {
  22. this.buffer = Buffer.concat([this.buffer,chunk]);
  23. this.cb = cb;
  24. this.emit('chunk');
  25. };
  26. // The `eof` parameter is interpreted as `file_length` if the type is number
  27. // otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
  28. PullStream.prototype.stream = function(eof,includeEof) {
  29. var p = Stream.PassThrough();
  30. var done,self= this;
  31. function cb() {
  32. if (typeof self.cb === strFunction) {
  33. var callback = self.cb;
  34. self.cb = undefined;
  35. return callback();
  36. }
  37. }
  38. function pull() {
  39. var packet;
  40. if (self.buffer && self.buffer.length) {
  41. if (typeof eof === 'number') {
  42. packet = self.buffer.slice(0,eof);
  43. self.buffer = self.buffer.slice(eof);
  44. eof -= packet.length;
  45. done = !eof;
  46. } else {
  47. var match = self.buffer.indexOf(eof);
  48. if (match !== -1) {
  49. // store signature match byte offset to allow us to reference
  50. // this for zip64 offset
  51. self.match = match
  52. if (includeEof) match = match + eof.length;
  53. packet = self.buffer.slice(0,match);
  54. self.buffer = self.buffer.slice(match);
  55. done = true;
  56. } else {
  57. var len = self.buffer.length - eof.length;
  58. if (len <= 0) {
  59. cb();
  60. } else {
  61. packet = self.buffer.slice(0,len);
  62. self.buffer = self.buffer.slice(len);
  63. }
  64. }
  65. }
  66. if (packet) p.write(packet,function() {
  67. if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
  68. });
  69. }
  70. if (!done) {
  71. if (self.finished) {
  72. self.removeListener('chunk',pull);
  73. self.emit('error', new Error('FILE_ENDED'));
  74. return;
  75. }
  76. } else {
  77. self.removeListener('chunk',pull);
  78. p.end();
  79. }
  80. }
  81. self.on('chunk',pull);
  82. pull();
  83. return p;
  84. };
  85. PullStream.prototype.pull = function(eof,includeEof) {
  86. if (eof === 0) return Promise.resolve('');
  87. // If we already have the required data in buffer
  88. // we can resolve the request immediately
  89. if (!isNaN(eof) && this.buffer.length > eof) {
  90. var data = this.buffer.slice(0,eof);
  91. this.buffer = this.buffer.slice(eof);
  92. return Promise.resolve(data);
  93. }
  94. // Otherwise we stream until we have it
  95. var buffer = Buffer.from(''),
  96. self = this;
  97. var concatStream = Stream.Transform();
  98. concatStream._transform = function(d,e,cb) {
  99. buffer = Buffer.concat([buffer,d]);
  100. cb();
  101. };
  102. var rejectHandler;
  103. var pullStreamRejectHandler;
  104. return new Promise(function(resolve,reject) {
  105. rejectHandler = reject;
  106. pullStreamRejectHandler = function(e) {
  107. self.__emittedError = e;
  108. reject(e);
  109. }
  110. if (self.finished)
  111. return reject(new Error('FILE_ENDED'));
  112. self.once('error',pullStreamRejectHandler); // reject any errors from pullstream itself
  113. self.stream(eof,includeEof)
  114. .on('error',reject)
  115. .pipe(concatStream)
  116. .on('finish',function() {resolve(buffer);})
  117. .on('error',reject);
  118. })
  119. .finally(function() {
  120. self.removeListener('error',rejectHandler);
  121. self.removeListener('error',pullStreamRejectHandler);
  122. });
  123. };
  124. PullStream.prototype._read = function(){};
  125. module.exports = PullStream;