123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- 'use strict';
- var util = require('util');
- var Stream = require('stream');
- var ChunkStream = module.exports = function() {
- Stream.call(this);
- this._buffers = [];
- this._buffered = 0;
- this._reads = [];
- this._paused = false;
- this._encoding = 'utf8';
- this.writable = true;
- };
- util.inherits(ChunkStream, Stream);
- ChunkStream.prototype.read = function(length, callback) {
- this._reads.push({
- length: Math.abs(length), // if length < 0 then at most this length
- allowLess: length < 0,
- func: callback
- });
- process.nextTick(function() {
- this._process();
- // its paused and there is not enought data then ask for more
- if (this._paused && this._reads.length > 0) {
- this._paused = false;
- this.emit('drain');
- }
- }.bind(this));
- };
- ChunkStream.prototype.write = function(data, encoding) {
- if (!this.writable) {
- this.emit('error', new Error('Stream not writable'));
- return false;
- }
- var dataBuffer;
- if (Buffer.isBuffer(data)) {
- dataBuffer = data;
- }
- else {
- dataBuffer = new Buffer(data, encoding || this._encoding);
- }
- this._buffers.push(dataBuffer);
- this._buffered += dataBuffer.length;
- this._process();
- // ok if there are no more read requests
- if (this._reads && this._reads.length === 0) {
- this._paused = true;
- }
- return this.writable && !this._paused;
- };
- ChunkStream.prototype.end = function(data, encoding) {
- if (data) {
- this.write(data, encoding);
- }
- this.writable = false;
- // already destroyed
- if (!this._buffers) {
- return;
- }
- // enqueue or handle end
- if (this._buffers.length === 0) {
- this._end();
- }
- else {
- this._buffers.push(null);
- this._process();
- }
- };
- ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
- ChunkStream.prototype._end = function() {
- if (this._reads.length > 0) {
- this.emit('error',
- new Error('Unexpected end of input')
- );
- }
- this.destroy();
- };
- ChunkStream.prototype.destroy = function() {
- if (!this._buffers) {
- return;
- }
- this.writable = false;
- this._reads = null;
- this._buffers = null;
- this.emit('close');
- };
- ChunkStream.prototype._processReadAllowingLess = function(read) {
- // ok there is any data so that we can satisfy this request
- this._reads.shift(); // == read
- // first we need to peek into first buffer
- var smallerBuf = this._buffers[0];
- // ok there is more data than we need
- if (smallerBuf.length > read.length) {
- this._buffered -= read.length;
- this._buffers[0] = smallerBuf.slice(read.length);
- read.func.call(this, smallerBuf.slice(0, read.length));
- }
- else {
- // ok this is less than maximum length so use it all
- this._buffered -= smallerBuf.length;
- this._buffers.shift(); // == smallerBuf
- read.func.call(this, smallerBuf);
- }
- };
- ChunkStream.prototype._processRead = function(read) {
- this._reads.shift(); // == read
- var pos = 0;
- var count = 0;
- var data = new Buffer(read.length);
- // create buffer for all data
- while (pos < read.length) {
- var buf = this._buffers[count++];
- var len = Math.min(buf.length, read.length - pos);
- buf.copy(data, pos, 0, len);
- pos += len;
- // last buffer wasn't used all so just slice it and leave
- if (len !== buf.length) {
- this._buffers[--count] = buf.slice(len);
- }
- }
- // remove all used buffers
- if (count > 0) {
- this._buffers.splice(0, count);
- }
- this._buffered -= read.length;
- read.func.call(this, data);
- };
- ChunkStream.prototype._process = function() {
- try {
- // as long as there is any data and read requests
- while (this._buffered > 0 && this._reads && this._reads.length > 0) {
- var read = this._reads[0];
- // read any data (but no more than length)
- if (read.allowLess) {
- this._processReadAllowingLess(read);
- }
- else if (this._buffered >= read.length) {
- // ok we can meet some expectations
- this._processRead(read);
- }
- else {
- // not enought data to satisfy first request in queue
- // so we need to wait for more
- break;
- }
- }
- if (this._buffers && !this.writable) {
- this._end();
- }
- }
- catch (ex) {
- this.emit('error', ex);
- }
- };
|