Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
81146 views
1
var util = require('util');
2
var Stream = require('stream').Stream;
3
var DelayedStream = require('delayed-stream');
4
5
module.exports = CombinedStream;
6
function CombinedStream() {
7
this.writable = false;
8
this.readable = true;
9
this.dataSize = 0;
10
this.maxDataSize = 2 * 1024 * 1024;
11
this.pauseStreams = true;
12
13
this._released = false;
14
this._streams = [];
15
this._currentStream = null;
16
}
17
util.inherits(CombinedStream, Stream);
18
19
CombinedStream.create = function(options) {
20
var combinedStream = new this();
21
22
options = options || {};
23
for (var option in options) {
24
combinedStream[option] = options[option];
25
}
26
27
return combinedStream;
28
};
29
30
CombinedStream.isStreamLike = function(stream) {
31
return (typeof stream !== 'function')
32
&& (typeof stream !== 'string')
33
&& (typeof stream !== 'boolean')
34
&& (typeof stream !== 'number')
35
&& (!Buffer.isBuffer(stream));
36
};
37
38
CombinedStream.prototype.append = function(stream) {
39
var isStreamLike = CombinedStream.isStreamLike(stream);
40
41
if (isStreamLike) {
42
if (!(stream instanceof DelayedStream)) {
43
var newStream = DelayedStream.create(stream, {
44
maxDataSize: Infinity,
45
pauseStream: this.pauseStreams,
46
});
47
stream.on('data', this._checkDataSize.bind(this));
48
stream = newStream;
49
}
50
51
this._handleErrors(stream);
52
53
if (this.pauseStreams) {
54
stream.pause();
55
}
56
}
57
58
this._streams.push(stream);
59
return this;
60
};
61
62
CombinedStream.prototype.pipe = function(dest, options) {
63
Stream.prototype.pipe.call(this, dest, options);
64
this.resume();
65
return dest;
66
};
67
68
CombinedStream.prototype._getNext = function() {
69
this._currentStream = null;
70
var stream = this._streams.shift();
71
72
73
if (typeof stream == 'undefined') {
74
this.end();
75
return;
76
}
77
78
if (typeof stream !== 'function') {
79
this._pipeNext(stream);
80
return;
81
}
82
83
var getStream = stream;
84
getStream(function(stream) {
85
var isStreamLike = CombinedStream.isStreamLike(stream);
86
if (isStreamLike) {
87
stream.on('data', this._checkDataSize.bind(this));
88
this._handleErrors(stream);
89
}
90
91
this._pipeNext(stream);
92
}.bind(this));
93
};
94
95
CombinedStream.prototype._pipeNext = function(stream) {
96
this._currentStream = stream;
97
98
var isStreamLike = CombinedStream.isStreamLike(stream);
99
if (isStreamLike) {
100
stream.on('end', this._getNext.bind(this));
101
stream.pipe(this, {end: false});
102
return;
103
}
104
105
var value = stream;
106
this.write(value);
107
this._getNext();
108
};
109
110
CombinedStream.prototype._handleErrors = function(stream) {
111
var self = this;
112
stream.on('error', function(err) {
113
self._emitError(err);
114
});
115
};
116
117
CombinedStream.prototype.write = function(data) {
118
this.emit('data', data);
119
};
120
121
CombinedStream.prototype.pause = function() {
122
if (!this.pauseStreams) {
123
return;
124
}
125
126
if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause();
127
this.emit('pause');
128
};
129
130
CombinedStream.prototype.resume = function() {
131
if (!this._released) {
132
this._released = true;
133
this.writable = true;
134
this._getNext();
135
}
136
137
if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume();
138
this.emit('resume');
139
};
140
141
CombinedStream.prototype.end = function() {
142
this._reset();
143
this.emit('end');
144
};
145
146
CombinedStream.prototype.destroy = function() {
147
this._reset();
148
this.emit('close');
149
};
150
151
CombinedStream.prototype._reset = function() {
152
this.writable = false;
153
this._streams = [];
154
this._currentStream = null;
155
};
156
157
CombinedStream.prototype._checkDataSize = function() {
158
this._updateDataSize();
159
if (this.dataSize <= this.maxDataSize) {
160
return;
161
}
162
163
var message =
164
'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.';
165
this._emitError(new Error(message));
166
};
167
168
CombinedStream.prototype._updateDataSize = function() {
169
this.dataSize = 0;
170
171
var self = this;
172
this._streams.forEach(function(stream) {
173
if (!stream.dataSize) {
174
return;
175
}
176
177
self.dataSize += stream.dataSize;
178
});
179
180
if (this._currentStream && this._currentStream.dataSize) {
181
this.dataSize += this._currentStream.dataSize;
182
}
183
};
184
185
CombinedStream.prototype._emitError = function(err) {
186
this._reset();
187
this.emit('error', err);
188
};
189
190