stream-buf.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /* eslint-disable max-classes-per-file */
  2. const Stream = require('readable-stream');
  3. const utils = require('./utils');
  4. const StringBuf = require('./string-buf');
  5. // =============================================================================
  6. // data chunks - encapsulating incoming data
  7. class StringChunk {
  8. constructor(data, encoding) {
  9. this._data = data;
  10. this._encoding = encoding;
  11. }
  12. get length() {
  13. return this.toBuffer().length;
  14. }
  15. // copy to target buffer
  16. copy(target, targetOffset, offset, length) {
  17. return this.toBuffer().copy(target, targetOffset, offset, length);
  18. }
  19. toBuffer() {
  20. if (!this._buffer) {
  21. this._buffer = Buffer.from(this._data, this._encoding);
  22. }
  23. return this._buffer;
  24. }
  25. }
  26. class StringBufChunk {
  27. constructor(data) {
  28. this._data = data;
  29. }
  30. get length() {
  31. return this._data.length;
  32. }
  33. // copy to target buffer
  34. copy(target, targetOffset, offset, length) {
  35. // eslint-disable-next-line no-underscore-dangle
  36. return this._data._buf.copy(target, targetOffset, offset, length);
  37. }
  38. toBuffer() {
  39. return this._data.toBuffer();
  40. }
  41. }
  42. class BufferChunk {
  43. constructor(data) {
  44. this._data = data;
  45. }
  46. get length() {
  47. return this._data.length;
  48. }
  49. // copy to target buffer
  50. copy(target, targetOffset, offset, length) {
  51. this._data.copy(target, targetOffset, offset, length);
  52. }
  53. toBuffer() {
  54. return this._data;
  55. }
  56. }
  57. // =============================================================================
  58. // ReadWriteBuf - a single buffer supporting simple read-write
  59. class ReadWriteBuf {
  60. constructor(size) {
  61. this.size = size;
  62. // the buffer
  63. this.buffer = Buffer.alloc(size);
  64. // read index
  65. this.iRead = 0;
  66. // write index
  67. this.iWrite = 0;
  68. }
  69. toBuffer() {
  70. if (this.iRead === 0 && this.iWrite === this.size) {
  71. return this.buffer;
  72. }
  73. const buf = Buffer.alloc(this.iWrite - this.iRead);
  74. this.buffer.copy(buf, 0, this.iRead, this.iWrite);
  75. return buf;
  76. }
  77. get length() {
  78. return this.iWrite - this.iRead;
  79. }
  80. get eod() {
  81. return this.iRead === this.iWrite;
  82. }
  83. get full() {
  84. return this.iWrite === this.size;
  85. }
  86. read(size) {
  87. let buf;
  88. // read size bytes from buffer and return buffer
  89. if (size === 0) {
  90. // special case - return null if no data requested
  91. return null;
  92. }
  93. if (size === undefined || size >= this.length) {
  94. // if no size specified or size is at least what we have then return all of the bytes
  95. buf = this.toBuffer();
  96. this.iRead = this.iWrite;
  97. return buf;
  98. }
  99. // otherwise return a chunk
  100. buf = Buffer.alloc(size);
  101. this.buffer.copy(buf, 0, this.iRead, size);
  102. this.iRead += size;
  103. return buf;
  104. }
  105. write(chunk, offset, length) {
  106. // write as many bytes from data from optional source offset
  107. // and return number of bytes written
  108. const size = Math.min(length, this.size - this.iWrite);
  109. chunk.copy(this.buffer, this.iWrite, offset, offset + size);
  110. this.iWrite += size;
  111. return size;
  112. }
  113. }
  114. // =============================================================================
  115. // StreamBuf - a multi-purpose read-write stream
  116. // As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
  117. // As StreamHub - pipe to multiple writables
  118. // As readable stream - feed data into the writable part and have some other code read from it.
  119. // Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
  120. // integration tests
  121. const StreamBuf = function(options) {
  122. options = options || {};
  123. this.bufSize = options.bufSize || 1024 * 1024;
  124. this.buffers = [];
  125. // batch mode fills a buffer completely before passing the data on
  126. // to pipes or 'readable' event listeners
  127. this.batch = options.batch || false;
  128. this.corked = false;
  129. // where in the current writable buffer we're up to
  130. this.inPos = 0;
  131. // where in the current readable buffer we've read up to
  132. this.outPos = 0;
  133. // consuming pipe streams go here
  134. this.pipes = [];
  135. // controls emit('data')
  136. this.paused = false;
  137. this.encoding = null;
  138. };
  139. utils.inherits(StreamBuf, Stream.Duplex, {
  140. toBuffer() {
  141. switch (this.buffers.length) {
  142. case 0:
  143. return null;
  144. case 1:
  145. return this.buffers[0].toBuffer();
  146. default:
  147. return Buffer.concat(this.buffers.map(rwBuf => rwBuf.toBuffer()));
  148. }
  149. },
  150. // writable
  151. // event drain - if write returns false (which it won't), indicates when safe to write again.
  152. // finish - end() has been called
  153. // pipe(src) - pipe() has been called on readable
  154. // unpipe(src) - unpipe() has been called on readable
  155. // error - duh
  156. _getWritableBuffer() {
  157. if (this.buffers.length) {
  158. const last = this.buffers[this.buffers.length - 1];
  159. if (!last.full) {
  160. return last;
  161. }
  162. }
  163. const buf = new ReadWriteBuf(this.bufSize);
  164. this.buffers.push(buf);
  165. return buf;
  166. },
  167. async _pipe(chunk) {
  168. const write = function(pipe) {
  169. return new Promise(resolve => {
  170. pipe.write(chunk.toBuffer(), () => {
  171. resolve();
  172. });
  173. });
  174. };
  175. await Promise.all(this.pipes.map(write));
  176. },
  177. _writeToBuffers(chunk) {
  178. let inPos = 0;
  179. const inLen = chunk.length;
  180. while (inPos < inLen) {
  181. // find writable buffer
  182. const buffer = this._getWritableBuffer();
  183. // write some data
  184. inPos += buffer.write(chunk, inPos, inLen - inPos);
  185. }
  186. },
  187. async write(data, encoding, callback) {
  188. if (encoding instanceof Function) {
  189. callback = encoding;
  190. encoding = 'utf8';
  191. }
  192. callback = callback || utils.nop;
  193. // encapsulate data into a chunk
  194. let chunk;
  195. if (data instanceof StringBuf) {
  196. chunk = new StringBufChunk(data);
  197. } else if (data instanceof Buffer) {
  198. chunk = new BufferChunk(data);
  199. } else if (typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer) {
  200. chunk = new StringChunk(data, encoding);
  201. } else {
  202. throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
  203. }
  204. // now, do something with the chunk
  205. if (this.pipes.length) {
  206. if (this.batch) {
  207. this._writeToBuffers(chunk);
  208. while (!this.corked && this.buffers.length > 1) {
  209. this._pipe(this.buffers.shift());
  210. }
  211. } else if (!this.corked) {
  212. await this._pipe(chunk);
  213. callback();
  214. } else {
  215. this._writeToBuffers(chunk);
  216. process.nextTick(callback);
  217. }
  218. } else {
  219. if (!this.paused) {
  220. this.emit('data', chunk.toBuffer());
  221. }
  222. this._writeToBuffers(chunk);
  223. this.emit('readable');
  224. }
  225. return true;
  226. },
  227. cork() {
  228. this.corked = true;
  229. },
  230. _flush(/* destination */) {
  231. // if we have comsumers...
  232. if (this.pipes.length) {
  233. // and there's stuff not written
  234. while (this.buffers.length) {
  235. this._pipe(this.buffers.shift());
  236. }
  237. }
  238. },
  239. uncork() {
  240. this.corked = false;
  241. this._flush();
  242. },
  243. end(chunk, encoding, callback) {
  244. const writeComplete = error => {
  245. if (error) {
  246. callback(error);
  247. } else {
  248. this._flush();
  249. this.pipes.forEach(pipe => {
  250. pipe.end();
  251. });
  252. this.emit('finish');
  253. }
  254. };
  255. if (chunk) {
  256. this.write(chunk, encoding, writeComplete);
  257. } else {
  258. writeComplete();
  259. }
  260. },
  261. // readable
  262. // event readable - some data is now available
  263. // event data - switch to flowing mode - feeds chunks to handler
  264. // event end - no more data
  265. // event close - optional, indicates upstream close
  266. // event error - duh
  267. read(size) {
  268. let buffers;
  269. // read min(buffer, size || infinity)
  270. if (size) {
  271. buffers = [];
  272. while (size && this.buffers.length && !this.buffers[0].eod) {
  273. const first = this.buffers[0];
  274. const buffer = first.read(size);
  275. size -= buffer.length;
  276. buffers.push(buffer);
  277. if (first.eod && first.full) {
  278. this.buffers.shift();
  279. }
  280. }
  281. return Buffer.concat(buffers);
  282. }
  283. buffers = this.buffers.map(buf => buf.toBuffer()).filter(Boolean);
  284. this.buffers = [];
  285. return Buffer.concat(buffers);
  286. },
  287. setEncoding(encoding) {
  288. // causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
  289. this.encoding = encoding;
  290. },
  291. pause() {
  292. this.paused = true;
  293. },
  294. resume() {
  295. this.paused = false;
  296. },
  297. isPaused() {
  298. return !!this.paused;
  299. },
  300. pipe(destination) {
  301. // add destination to pipe list & write current buffer
  302. this.pipes.push(destination);
  303. if (!this.paused && this.buffers.length) {
  304. this.end();
  305. }
  306. },
  307. unpipe(destination) {
  308. // remove destination from pipe list
  309. this.pipes = this.pipes.filter(pipe => pipe !== destination);
  310. },
  311. unshift(/* chunk */) {
  312. // some numpty has read some data that's not for them and they want to put it back!
  313. // Might implement this some day
  314. throw new Error('Not Implemented');
  315. },
  316. wrap(/* stream */) {
  317. // not implemented
  318. throw new Error('Not Implemented');
  319. },
  320. });
  321. module.exports = StreamBuf;