Environment: NodeJS, Express, DynamoDB (but could be any database really)
Scenario: Need to read a large number of records and return to the user as a downloadable file. This means that I cannot buffer all the content at once and then send it in a response from Express. Also, I may need to execute the query multiple times since all data might not be returned in one query.
Proposed Solution: Use a readable stream that can be piped to the response stream in Express.
I started by creating an object that inherits from stream.Readable and implemented a _read() method which pushes the query results. The problem is that the database query invoked in _read() is async but the stream.read() is a sync method.
When the stream is piped to the response of the server, the read is invoked several times before the db query even got a chance to execute. So the query is invoked multiple times and even when the first instance of the query finishes and does a push(null), the other queries plete and I get a "push() after EOF" error.
- Is there a way to do this properly with _read()?
- Should I forget about _read() and just execute the query and push() results in the constructor?
- Should I execute the query and emit data events instead of push()?
Thank you
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
EDIT: After posting this question, I've found this post with an answer that shows how to do it without using inheritance: How to call an asynchronous function inside a node.js readable stream
A ment was made there that inside _read() there should only be one push(). And each push() will usually generate another read() invocation.
Environment: NodeJS, Express, DynamoDB (but could be any database really)
Scenario: Need to read a large number of records and return to the user as a downloadable file. This means that I cannot buffer all the content at once and then send it in a response from Express. Also, I may need to execute the query multiple times since all data might not be returned in one query.
Proposed Solution: Use a readable stream that can be piped to the response stream in Express.
I started by creating an object that inherits from stream.Readable and implemented a _read() method which pushes the query results. The problem is that the database query invoked in _read() is async but the stream.read() is a sync method.
When the stream is piped to the response of the server, the read is invoked several times before the db query even got a chance to execute. So the query is invoked multiple times and even when the first instance of the query finishes and does a push(null), the other queries plete and I get a "push() after EOF" error.
- Is there a way to do this properly with _read()?
- Should I forget about _read() and just execute the query and push() results in the constructor?
- Should I execute the query and emit data events instead of push()?
Thank you
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
EDIT: After posting this question, I've found this post with an answer that shows how to do it without using inheritance: How to call an asynchronous function inside a node.js readable stream
A ment was made there that inside _read() there should only be one push(). And each push() will usually generate another read() invocation.
Share Improve this question edited May 23, 2017 at 10:31 CommunityBot 11 silver badge asked Mar 10, 2016 at 15:38 swbanditswbandit 2,0061 gold badge27 silver badges37 bronze badges 4- Can you provide an example of the code you're writing? – mikefrey Commented Mar 10, 2016 at 15:55
- I've added the code I have so far – swbandit Commented Mar 10, 2016 at 16:35
- Possibly related: stackoverflow./questions/20058614/… – Tomalak Commented Mar 10, 2016 at 17:34
-
I'd point you to my
scramjet
module, but I don't have such a simple readable interface yet. If you're still interested I could show you how to do asynchronous stream mapping that would fit the above scenario very well. – Michał Karpacki Commented Apr 19, 2017 at 13:28
2 Answers
Reset to default 1Be aware of the different modes of Stream: https://nodejs/api/stream.html#stream_two_modes
const Readable = require('stream').Readable;
// starts in paused mode
const readable = new Readable();
let i = 0;
fetchMyAsyncData() {
setTimeout(() => {
// still remains in paused mode
readable.push(++i);
if (i === 5) {
return readable.emit('end');
}
fetchMyAsyncData();
}, 500);
}
// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods."
app.get('/mystreamingresponse', (req, res) => {
// remains in paused mode
readable.on('readable', () => res.write(readable.read()));
fetchMyAsyncData();
// closes the response stream once all external data arrived
readable.on('end', () => res.end());
})
I found the answer for this which is on NodeJs 12+ (even lower than that?).
The best way to acplish this is thru generator/iterator functions.
This is the example, that I did with CosmosDb that provides a token to iterate on queries. However, you can execute any async call like this.
The idea is that the generator function creates an iterator on the first execution, and the yield call will provide the result for each iteration. The method gets paused on each yield, until it returns a value (last return true
).
async function* reader() {
let continuationToken = null;
do {
const result = await myAsyncCall(filter, continuationToken);
continuationToken = result.continuationToken;
// return the resources to the writer
yield result.resources;
} while (continuationToken);
// finish the iterator
return true;
}
await pipeline(
// note this is indeed the first call to reader(), not the method pointer.
Readable.from(reader()),
ws);
With this setup, the Writable will receive the resources array as the chunk, and it can process it as needed.
One caveat is that there is no way to connect the Writable highwatermark to the Readable highwatermark (which used to work when I used a Readable child class).
However, it should not be such a big issue since the Writable still controls the flow, and you as developer also have the control.