| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- /* eslint-disable max-classes-per-file */
- const Stream = require('readable-stream');
- const utils = require('./utils');
- const StringBuf = require('./string-buf');
- // =============================================================================
- // data chunks - encapsulating incoming data
- class StringChunk {
- constructor(data, encoding) {
- this._data = data;
- this._encoding = encoding;
- }
- get length() {
- return this.toBuffer().length;
- }
- // copy to target buffer
- copy(target, targetOffset, offset, length) {
- return this.toBuffer().copy(target, targetOffset, offset, length);
- }
- toBuffer() {
- if (!this._buffer) {
- this._buffer = Buffer.from(this._data, this._encoding);
- }
- return this._buffer;
- }
- }
- class StringBufChunk {
- constructor(data) {
- this._data = data;
- }
- get length() {
- return this._data.length;
- }
- // copy to target buffer
- copy(target, targetOffset, offset, length) {
- // eslint-disable-next-line no-underscore-dangle
- return this._data._buf.copy(target, targetOffset, offset, length);
- }
- toBuffer() {
- return this._data.toBuffer();
- }
- }
- class BufferChunk {
- constructor(data) {
- this._data = data;
- }
- get length() {
- return this._data.length;
- }
- // copy to target buffer
- copy(target, targetOffset, offset, length) {
- this._data.copy(target, targetOffset, offset, length);
- }
- toBuffer() {
- return this._data;
- }
- }
- // =============================================================================
- // ReadWriteBuf - a single buffer supporting simple read-write
- class ReadWriteBuf {
- constructor(size) {
- this.size = size;
- // the buffer
- this.buffer = Buffer.alloc(size);
- // read index
- this.iRead = 0;
- // write index
- this.iWrite = 0;
- }
- toBuffer() {
- if (this.iRead === 0 && this.iWrite === this.size) {
- return this.buffer;
- }
- const buf = Buffer.alloc(this.iWrite - this.iRead);
- this.buffer.copy(buf, 0, this.iRead, this.iWrite);
- return buf;
- }
- get length() {
- return this.iWrite - this.iRead;
- }
- get eod() {
- return this.iRead === this.iWrite;
- }
- get full() {
- return this.iWrite === this.size;
- }
- read(size) {
- let buf;
- // read size bytes from buffer and return buffer
- if (size === 0) {
- // special case - return null if no data requested
- return null;
- }
- if (size === undefined || size >= this.length) {
- // if no size specified or size is at least what we have then return all of the bytes
- buf = this.toBuffer();
- this.iRead = this.iWrite;
- return buf;
- }
- // otherwise return a chunk
- buf = Buffer.alloc(size);
- this.buffer.copy(buf, 0, this.iRead, size);
- this.iRead += size;
- return buf;
- }
- write(chunk, offset, length) {
- // write as many bytes from data from optional source offset
- // and return number of bytes written
- const size = Math.min(length, this.size - this.iWrite);
- chunk.copy(this.buffer, this.iWrite, offset, offset + size);
- this.iWrite += size;
- return size;
- }
- }
- // =============================================================================
- // StreamBuf - a multi-purpose read-write stream
- // As MemBuf - write as much data as you like. Then call toBuffer() to consolidate
- // As StreamHub - pipe to multiple writables
- // As readable stream - feed data into the writable part and have some other code read from it.
- // Note: Not sure why but StreamBuf does not like JS "class" sugar. It fails the
- // integration tests
- const StreamBuf = function(options) {
- options = options || {};
- this.bufSize = options.bufSize || 1024 * 1024;
- this.buffers = [];
- // batch mode fills a buffer completely before passing the data on
- // to pipes or 'readable' event listeners
- this.batch = options.batch || false;
- this.corked = false;
- // where in the current writable buffer we're up to
- this.inPos = 0;
- // where in the current readable buffer we've read up to
- this.outPos = 0;
- // consuming pipe streams go here
- this.pipes = [];
- // controls emit('data')
- this.paused = false;
- this.encoding = null;
- };
- utils.inherits(StreamBuf, Stream.Duplex, {
- toBuffer() {
- switch (this.buffers.length) {
- case 0:
- return null;
- case 1:
- return this.buffers[0].toBuffer();
- default:
- return Buffer.concat(this.buffers.map(rwBuf => rwBuf.toBuffer()));
- }
- },
- // writable
- // event drain - if write returns false (which it won't), indicates when safe to write again.
- // finish - end() has been called
- // pipe(src) - pipe() has been called on readable
- // unpipe(src) - unpipe() has been called on readable
- // error - duh
- _getWritableBuffer() {
- if (this.buffers.length) {
- const last = this.buffers[this.buffers.length - 1];
- if (!last.full) {
- return last;
- }
- }
- const buf = new ReadWriteBuf(this.bufSize);
- this.buffers.push(buf);
- return buf;
- },
- async _pipe(chunk) {
- const write = function(pipe) {
- return new Promise(resolve => {
- pipe.write(chunk.toBuffer(), () => {
- resolve();
- });
- });
- };
- await Promise.all(this.pipes.map(write));
- },
- _writeToBuffers(chunk) {
- let inPos = 0;
- const inLen = chunk.length;
- while (inPos < inLen) {
- // find writable buffer
- const buffer = this._getWritableBuffer();
- // write some data
- inPos += buffer.write(chunk, inPos, inLen - inPos);
- }
- },
- async write(data, encoding, callback) {
- if (encoding instanceof Function) {
- callback = encoding;
- encoding = 'utf8';
- }
- callback = callback || utils.nop;
- // encapsulate data into a chunk
- let chunk;
- if (data instanceof StringBuf) {
- chunk = new StringBufChunk(data);
- } else if (data instanceof Buffer) {
- chunk = new BufferChunk(data);
- } else if (typeof data === 'string' || data instanceof String || data instanceof ArrayBuffer) {
- chunk = new StringChunk(data, encoding);
- } else {
- throw new Error('Chunk must be one of type String, Buffer or StringBuf.');
- }
- // now, do something with the chunk
- if (this.pipes.length) {
- if (this.batch) {
- this._writeToBuffers(chunk);
- while (!this.corked && this.buffers.length > 1) {
- this._pipe(this.buffers.shift());
- }
- } else if (!this.corked) {
- await this._pipe(chunk);
- callback();
- } else {
- this._writeToBuffers(chunk);
- process.nextTick(callback);
- }
- } else {
- if (!this.paused) {
- this.emit('data', chunk.toBuffer());
- }
- this._writeToBuffers(chunk);
- this.emit('readable');
- }
- return true;
- },
- cork() {
- this.corked = true;
- },
- _flush(/* destination */) {
- // if we have comsumers...
- if (this.pipes.length) {
- // and there's stuff not written
- while (this.buffers.length) {
- this._pipe(this.buffers.shift());
- }
- }
- },
- uncork() {
- this.corked = false;
- this._flush();
- },
- end(chunk, encoding, callback) {
- const writeComplete = error => {
- if (error) {
- callback(error);
- } else {
- this._flush();
- this.pipes.forEach(pipe => {
- pipe.end();
- });
- this.emit('finish');
- }
- };
- if (chunk) {
- this.write(chunk, encoding, writeComplete);
- } else {
- writeComplete();
- }
- },
- // readable
- // event readable - some data is now available
- // event data - switch to flowing mode - feeds chunks to handler
- // event end - no more data
- // event close - optional, indicates upstream close
- // event error - duh
- read(size) {
- let buffers;
- // read min(buffer, size || infinity)
- if (size) {
- buffers = [];
- while (size && this.buffers.length && !this.buffers[0].eod) {
- const first = this.buffers[0];
- const buffer = first.read(size);
- size -= buffer.length;
- buffers.push(buffer);
- if (first.eod && first.full) {
- this.buffers.shift();
- }
- }
- return Buffer.concat(buffers);
- }
- buffers = this.buffers.map(buf => buf.toBuffer()).filter(Boolean);
- this.buffers = [];
- return Buffer.concat(buffers);
- },
- setEncoding(encoding) {
- // causes stream.read or stream.on('data) to return strings of encoding instead of Buffer objects
- this.encoding = encoding;
- },
- pause() {
- this.paused = true;
- },
- resume() {
- this.paused = false;
- },
- isPaused() {
- return !!this.paused;
- },
- pipe(destination) {
- // add destination to pipe list & write current buffer
- this.pipes.push(destination);
- if (!this.paused && this.buffers.length) {
- this.end();
- }
- },
- unpipe(destination) {
- // remove destination from pipe list
- this.pipes = this.pipes.filter(pipe => pipe !== destination);
- },
- unshift(/* chunk */) {
- // some numpty has read some data that's not for them and they want to put it back!
- // Might implement this some day
- throw new Error('Not Implemented');
- },
- wrap(/* stream */) {
- // not implemented
- throw new Error('Not Implemented');
- },
- });
- module.exports = StreamBuf;
|