123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- 'use strict';
- var util = require('util');
- var Stream = require('stream');
- var ChunkStream = module.exports = function() {
- Stream.call(this);
- this._buffers = [];
- this._buffered = 0;
- this._reads = [];
- this._paused = false;
- this._encoding = 'utf8';
- this.writable = true;
- };
- util.inherits(ChunkStream, Stream);
- ChunkStream.prototype.read = function(length, callback) {
- this._reads.push({
- length: Math.abs(length),
- allowLess: length < 0,
- func: callback
- });
- process.nextTick(function() {
- this._process();
-
- if (this._paused && this._reads.length > 0) {
- this._paused = false;
- this.emit('drain');
- }
- }.bind(this));
- };
- ChunkStream.prototype.write = function(data, encoding) {
- if (!this.writable) {
- this.emit('error', new Error('Stream not writable'));
- return false;
- }
- var dataBuffer;
- if (Buffer.isBuffer(data)) {
- dataBuffer = data;
- }
- else {
- dataBuffer = new Buffer(data, encoding || this._encoding);
- }
- this._buffers.push(dataBuffer);
- this._buffered += dataBuffer.length;
- this._process();
-
- if (this._reads && this._reads.length === 0) {
- this._paused = true;
- }
- return this.writable && !this._paused;
- };
- ChunkStream.prototype.end = function(data, encoding) {
- if (data) {
- this.write(data, encoding);
- }
- this.writable = false;
-
- if (!this._buffers) {
- return;
- }
-
- if (this._buffers.length === 0) {
- this._end();
- }
- else {
- this._buffers.push(null);
- this._process();
- }
- };
- ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
- ChunkStream.prototype._end = function() {
- if (this._reads.length > 0) {
- this.emit('error',
- new Error('Unexpected end of input')
- );
- }
- this.destroy();
- };
- ChunkStream.prototype.destroy = function() {
- if (!this._buffers) {
- return;
- }
- this.writable = false;
- this._reads = null;
- this._buffers = null;
- this.emit('close');
- };
- ChunkStream.prototype._processReadAllowingLess = function(read) {
-
- this._reads.shift();
-
- var smallerBuf = this._buffers[0];
-
- if (smallerBuf.length > read.length) {
- this._buffered -= read.length;
- this._buffers[0] = smallerBuf.slice(read.length);
- read.func.call(this, smallerBuf.slice(0, read.length));
- }
- else {
-
- this._buffered -= smallerBuf.length;
- this._buffers.shift();
- read.func.call(this, smallerBuf);
- }
- };
- ChunkStream.prototype._processRead = function(read) {
- this._reads.shift();
- var pos = 0;
- var count = 0;
- var data = new Buffer(read.length);
-
- while (pos < read.length) {
- var buf = this._buffers[count++];
- var len = Math.min(buf.length, read.length - pos);
- buf.copy(data, pos, 0, len);
- pos += len;
-
- if (len !== buf.length) {
- this._buffers[--count] = buf.slice(len);
- }
- }
-
- if (count > 0) {
- this._buffers.splice(0, count);
- }
- this._buffered -= read.length;
- read.func.call(this, data);
- };
- ChunkStream.prototype._process = function() {
- try {
-
- while (this._buffered > 0 && this._reads && this._reads.length > 0) {
- var read = this._reads[0];
-
- if (read.allowLess) {
- this._processReadAllowingLess(read);
- }
- else if (this._buffered >= read.length) {
-
- this._processRead(read);
- }
- else {
-
-
- break;
- }
- }
- if (this._buffers && !this.writable) {
- this._end();
- }
- }
- catch (ex) {
- this.emit('error', ex);
- }
- };
|