stream-buf.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. "use strict";
  2. /* eslint-disable max-classes-per-file */
  3. const Stream = require('readable-stream');
  4. const utils = require('./utils');
  5. const StringBuf = require('./string-buf');
  6. // =============================================================================
  7. // data chunks - encapsulating incoming data
  8. class StringChunk {
  9. constructor(data, encoding) {
  10. this._data = data;
  11. this._encoding = encoding;
  12. }
  13. get length() {
  14. return this.toBuffer().length;
  15. }
  16. // copy to target buffer
  17. copy(target, targetOffset, offset, length) {
  18. return this.toBuffer().copy(target, targetOffset, offset, length);
  19. }
  20. toBuffer() {
  21. if (!this._buffer) {
  22. this._buffer = Buffer.from(this._data, this._encoding);
  23. }
  24. return this._buffer;
  25. }
  26. }
  27. class StringBufChunk {
  28. constructor(data) {
  29. this._data = data;
  30. }
  31. get length() {
  32. return this._data.length;
  33. }
  34. // copy to target buffer
  35. copy(target, targetOffset, offset, length) {
  36. // eslint-disable-next-line no-underscore-dangle
  37. return this._data._buf.copy(target, targetOffset, offset, length);
  38. }
  39. toBuffer() {
  40. return this._data.toBuffer();
  41. }
  42. }
  43. class BufferChunk {
  44. constructor(data) {
  45. this._data = data;
  46. }
  47. get length() {
  48. return this._data.length;
  49. }
  50. // copy to target buffer
  51. copy(target, targetOffset, offset, length) {
  52. this._data.copy(target, targetOffset, offset, length);
  53. }
  54. toBuffer() {
  55. return this._data;
  56. }
  57. }
  58. // =============================================================================
  59. // ReadWriteBuf - a single buffer supporting simple read-write
  60. class ReadWriteBuf {
  61. constructor(size) {
  62. this.size = size;
  63. // the buffer
  64. this.buffer = Buffer.alloc(size);
  65. // read index
  66. this.iRead = 0;
  67. // write index
  68. this.iWrite = 0;
  69. }
  70. toBuffer() {
  71. if (this.iRead === 0 && this.iWrite === this.size) {
  72. return this.buffer;
  73. }
  74. const buf = Buffer.alloc(this.iWrite - this.iRead);
  75. this.buffer.copy(buf, 0, this.iRead, this.iWrite);
  76. return buf;
  77. }
  78. get length() {
  79. return this.iWrite - this.iRead;
  80. }
  81. get eod() {
  82. return this.iRead === this.iWrite;
  83. }
  84. get full() {
  85. return this.iWrite === this.size;
  86. }
  87. read(size) {
  88. let buf;
  89. // read size bytes from buffer and return buffer
  90. if (size === 0) {
  91. // special case - return null if no data requested
  92. return null;
  93. }
  94. if (size === undefined || size >= this.length) {
  95. // if no size specified or size is at least what we have then return all of the bytes
  96. buf = this.toBuffer();
  97. this.iRead = this.iWrite;
  98. return buf;
  99. }
  100. // otherwise return a chunk
  101. buf = Buffer.alloc(size);
  102. this.buffer.copy(buf, 0, this.iRead, size);
  103. this.iRead += size;
  104. return buf;
  105. }
  106. write(chunk, offset, length) {
  107. // write as many bytes from data from optional source offset
  108. // and return number of bytes written
  109. const size = Math.min(length, this.size - this.iWrite);
  110. chunk.copy(this.buffer, this.iWrite, offset, offset + size);
  111. this.iWrite += size;
  112. return size;
  113. }
  114. }
  115. // =============================================================================
  116. // StreamBuf - a multi-purpose read-write stream
  117. // As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
  118. // As StreamHub - pipe to multiple writables
  119. // As readable stream - feed data into the writable part and have some other code read from it.
  120. // Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
  121. // integration tests
  122. const StreamBuf = function (options) {
  123. options = options || {};
  124. this.bufSize = options.bufSize || 1024 * 1024;
  125. this.buffers = [];
  126. // batch mode fills a buffer completely before passing the data on
  127. // to pipes or 'readable' event listeners
  128. this.batch = options.batch || false;
  129. this.corked = false;
  130. // where in the current writable buffer we're up to
  131. this.inPos = 0;
  132. // where in the current readable buffer we've read up to
  133. this.outPos = 0;
  134. // consuming pipe streams go here
  135. this.pipes = [];
  136. // controls emit('data')
  137. this.paused = false;
  138. this.encoding = null;
  139. };
  140. utils.inherits(StreamBuf, Stream.Duplex, {
  141. toBuffer() {
  142. switch (this.buffers.length) {
  143. case 0:
  144. return null;
  145. case 1:
  146. return this.buffers[0].toBuffer();
  147. default:
  148. return Buffer.concat(this.buffers.map(rwBuf => rwBuf.toBuffer()));
  149. }
  150. },
  151. // writable
  152. // event drain - if write returns false (which it won't), indicates when safe to write again.
  153. // finish - end() has been called
  154. // pipe(src) - pipe() has been called on readable
  155. // unpipe(src) - unpipe() has been called on readable
  156. // error - duh
  157. _getWritableBuffer() {
  158. if (this.buffers.length) {
  159. const last = this.buffers[this.buffers.length - 1];
  160. if (!last.full) {
  161. return last;
  162. }
  163. }
  164. const buf = new ReadWriteBuf(this.bufSize);
  165. this.buffers.push(buf);
  166. return buf;
  167. },
  168. async _pipe(chunk) {
  169. const write = function (pipe) {
  170. return new Promise(resolve => {
  171. pipe.write(chunk.toBuffer(), () => {
  172. resolve();
  173. });
  174. });
  175. };
  176. await Promise.all(this.pipes.map(write));
  177. },
  178. _writeToBuffers(chunk) {
  179. let inPos = 0;
  180. const inLen = chunk.length;
  181. while (inPos < inLen) {
  182. // find writable buffer
  183. const buffer = this._getWritableBuffer();
  184. // write some data
  185. inPos += buffer.write(chunk, inPos, inLen - inPos);
  186. }
  187. },
  188. async write(data, encoding, callback) {
  189. if (encoding instanceof Function) {
  190. callback = encoding;
  191. encoding = 'utf8';
  192. }
  193. callback = callback || utils.nop;
  194. // encapsulate data into a chunk
  195. let chunk;
  196. if (data instanceof StringBuf) {
  197. chunk = new StringBufChunk(data);
  198. } else if (data instanceof Buffer) {
  199. chunk = new BufferChunk(data);
  200. } else if (typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer) {
  201. chunk = new StringChunk(data, encoding);
  202. } else {
  203. throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
  204. }
  205. // now, do something with the chunk
  206. if (this.pipes.length) {
  207. if (this.batch) {
  208. this._writeToBuffers(chunk);
  209. while (!this.corked && this.buffers.length > 1) {
  210. this._pipe(this.buffers.shift());
  211. }
  212. } else if (!this.corked) {
  213. await this._pipe(chunk);
  214. callback();
  215. } else {
  216. this._writeToBuffers(chunk);
  217. process.nextTick(callback);
  218. }
  219. } else {
  220. if (!this.paused) {
  221. this.emit('data', chunk.toBuffer());
  222. }
  223. this._writeToBuffers(chunk);
  224. this.emit('readable');
  225. }
  226. return true;
  227. },
  228. cork() {
  229. this.corked = true;
  230. },
  231. _flush( /* destination */
  232. ) {
  233. // if we have comsumers...
  234. if (this.pipes.length) {
  235. // and there's stuff not written
  236. while (this.buffers.length) {
  237. this._pipe(this.buffers.shift());
  238. }
  239. }
  240. },
  241. uncork() {
  242. this.corked = false;
  243. this._flush();
  244. },
  245. end(chunk, encoding, callback) {
  246. const writeComplete = error => {
  247. if (error) {
  248. callback(error);
  249. } else {
  250. this._flush();
  251. this.pipes.forEach(pipe => {
  252. pipe.end();
  253. });
  254. this.emit('finish');
  255. }
  256. };
  257. if (chunk) {
  258. this.write(chunk, encoding, writeComplete);
  259. } else {
  260. writeComplete();
  261. }
  262. },
  263. // readable
  264. // event readable - some data is now available
  265. // event data - switch to flowing mode - feeds chunks to handler
  266. // event end - no more data
  267. // event close - optional, indicates upstream close
  268. // event error - duh
  269. read(size) {
  270. let buffers;
  271. // read min(buffer, size || infinity)
  272. if (size) {
  273. buffers = [];
  274. while (size && this.buffers.length && !this.buffers[0].eod) {
  275. const first = this.buffers[0];
  276. const buffer = first.read(size);
  277. size -= buffer.length;
  278. buffers.push(buffer);
  279. if (first.eod && first.full) {
  280. this.buffers.shift();
  281. }
  282. }
  283. return Buffer.concat(buffers);
  284. }
  285. buffers = this.buffers.map(buf => buf.toBuffer()).filter(Boolean);
  286. this.buffers = [];
  287. return Buffer.concat(buffers);
  288. },
  289. setEncoding(encoding) {
  290. // causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
  291. this.encoding = encoding;
  292. },
  293. pause() {
  294. this.paused = true;
  295. },
  296. resume() {
  297. this.paused = false;
  298. },
  299. isPaused() {
  300. return !!this.paused;
  301. },
  302. pipe(destination) {
  303. // add destination to pipe list & write current buffer
  304. this.pipes.push(destination);
  305. if (!this.paused && this.buffers.length) {
  306. this.end();
  307. }
  308. },
  309. unpipe(destination) {
  310. // remove destination from pipe list
  311. this.pipes = this.pipes.filter(pipe => pipe !== destination);
  312. },
  313. unshift( /* chunk */
  314. ) {
  315. // some numpty has read some data that's not for them and they want to put it back!
  316. // Might implement this some day
  317. throw new Error('Not Implemented');
  318. },
  319. wrap( /* stream */
  320. ) {
  321. // not implemented
  322. throw new Error('Not Implemented');
  323. }
  324. });
  325. module.exports = StreamBuf;
  326. //# sourceMappingURL=stream-buf.js.map