I'm trying to learn streams and am having a little bit of an issue getting it to work right.
For this example, I'd simply like to push a static object to the stream and pipe that to my servers response.
Here's what I have so far, but a lot of it doesn't work. If I could even just get the stream to output to console, I can figure out how to pipe it to my response.
var Readable = require('stream').Readable;
var MyStream = function(options) {
Readable.call(this);
};
MyStream.prototype._read = function(n) {
this.push(chunk);
};
var stream = new MyStream({objectMode: true});
s.push({test: true});
request.reply(s);
I'm trying to learn streams and am having a little bit of an issue getting it to work right.
For this example, I'd simply like to push a static object to the stream and pipe that to my servers response.
Here's what I have so far, but a lot of it doesn't work. If I could even just get the stream to output to console, I can figure out how to pipe it to my response.
var Readable = require('stream').Readable;
var MyStream = function(options) {
Readable.call(this);
};
MyStream.prototype._read = function(n) {
this.push(chunk);
};
var stream = new MyStream({objectMode: true});
s.push({test: true});
request.reply(s);
Share
Improve this question
asked Dec 20, 2013 at 17:37
doremidoremi
15.3k31 gold badges97 silver badges152 bronze badges
2 Answers
Reset to default 10There are a couple of issues with your current code.
- The request stream is most likely a buffer mode stream: this means that you can't write objects into it. Fortunately, you don't pass through the options to the
Readable
constructor so your mistake doesn't cause any trouble, but semantically this is wrong and will not produce the expected results. - You call the constructor of
Readable
, but don't inherit the prototype properties. You should useutil.inherits()
to subclassReadable
. - The
chunk
variable isn't defined anywhere in your code sample.
Here is a working example:
var util = require('util');
var Readable = require('stream').Readable;
var MyStream = function(options) {
Readable.call(this, options); // pass through the options to the Readable constructor
this.counter = 1000;
};
util.inherits(MyStream, Readable); // inherit the prototype methods
MyStream.prototype._read = function(n) {
this.push('foobar');
if (this.counter-- === 0) { // stop the stream
this.push(null);
}
};
var mystream = new MyStream();
mystream.pipe(process.stdout);
The following is a variation of Paul Mougel's answer using ES6. It implements a countdown stream from a positive counter
down to 0
(the counter defaults to 1000). Then we create a stream counter
of 100, which is piped into the writable stream process.stdout
:
const { Readable } = require('stream');
class CountdownStream extends Readable {
constructor(counter, options) {
super(options);
this._counter = (!isNaN(counter) && counter > 0) ? counter : 1000;
}
_read(size) {
if (this._counter == 0) {
this.push(this._counter.toString());
this.push(null);
}
else {
this.push(this._counter + "\n");
}
this._counter -= 1;
}
}
const counter = new CountdownStream(100);
counter.pipe(process.stdout);