When writing NodeJS Transform stream in transform function how can I know if the chunk is the last one or there is no any new chunk ing.
_transform(chunk: any, encoding: string, callback: Function): void {
// accumulating chunks here to buffer
// so that I need to do some processing on the whole buffer
// and I need to understand when to do that
}
So I need to know when the chunks ing to the Stream are over, to do some processing on the buffer posed of all the chunks and then push the processed data from the stream.
When writing NodeJS Transform stream in transform function how can I know if the chunk is the last one or there is no any new chunk ing.
_transform(chunk: any, encoding: string, callback: Function): void {
// accumulating chunks here to buffer
// so that I need to do some processing on the whole buffer
// and I need to understand when to do that
}
So I need to know when the chunks ing to the Stream are over, to do some processing on the buffer posed of all the chunks and then push the processed data from the stream.
Share Improve this question asked Sep 29, 2017 at 18:13 Arman P.Arman P. 4,3942 gold badges30 silver badges48 bronze badges 3- strongloop./strongblog/… – Bergi Commented Jun 21, 2020 at 12:31
- not a plete solution, but, if you expect a fix chunk size(I'm saying because you ment you accumulate them perhaps to a fixed size?), then you could use simple maths to take note of the number of iterations required to be the last – juztcode Commented Sep 9, 2024 at 17:07
- or, keep a track of the total size of chunks processed if (totalprocessed+current_chunk_size) = file.size, then that's your last chunk – juztcode Commented Sep 10, 2024 at 5:03
4 Answers
Reset to default 9In the _transform
you can not figure out if there will be more data or not.
Depending on your use case you would either listen to the end
event, or use _flush
:
Stream: transform._flush(callback):
Custom Transform implementations may implement the
transform._flush()
method. This will be called when there is no more written data to be consumed, but before the'end'
event is emitted signaling the end of the Readable stream.Within the
transform._flush()
implementation, thereadable.push()
method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is plete.
Example,
const COUNTER_NULL_SYMBOL = Symbol('COUNTER_NULL_SYMBOL');
const Counter = () => {
let data = COUNTER_NULL_SYMBOL;
let counter = 1;
let first = true;
const counterStream = new Transform({
objectMode: true,
decodeStrings: false,
highWaterMark: 1,
transform(chunk, encoding, callback) {
if (data === COUNTER_NULL_SYMBOL) {
data = chunk;
return callback();
} else {
this.push({data, counter, last: false, first});
first = false;
counter++;
data = chunk;
return callback();
}
},
});
counterStream._flush = function (callback) {
if (data === COUNTER_NULL_SYMBOL) {
return callback();
} else {
this.push({data, counter, last: true, first});
return callback();
}
};
return counterStream;
};
Using through2
fs.createReadStream('/tmp/important.dat')
.pipe(through2(
(chunk, enc, cb) => cb(null, chunk), // transform is a noop
function (cb) { // FLUSH FUNCTION
this.push('tacking on an extra buffer to the end');
cb();
}
))
.pipe(fs.createWriteStream('/tmp/wut.txt'));
EDIT: WARNING DO NOT USE (I noticed that transform streams can still be pushed chunks after the source has ended or unpiped, I will see if I can e up with a solution and update this answer).
I have written a "last chunk aware transform" to demonstrate how to deal with this dilemma. Knowing at "chunk transform time" if the chunk is the last or not can be very useful (to check it afterwards on _flush()
can be too late). This transform will also be patible with streams that doesn't end it when done (unpipes instead), to try that out just un-ment the // end: false
line. My code is license free, use it as you wish.
import * as stream from 'node:stream'
// for this demo
class Source extends stream.Readable {
#offset = 0; #size = 10
_read(preferredSize) {
if (this.#offset > this.#size) {
this.push(null) // end stream
} else {
this.push(Buffer.from([this.#offset++]))
}
}
}
// for this demo (to inspect result)
class Output extends stream.Writable {
_write(chunk, _encoding, callback) {
console.log(chunk); callback()
}
_final(callback) {
console.log('end'); callback()
}
}
// last chunk aware transform
class Transform extends stream.Transform {
#prevChunk
#endOnUnpipe = true
constructor() {
super()
this.on('unpipe', this.#onUnpipe)
}
_transform(chunk, _encoding, callback) {
if (!this.#prevChunk) {
this.#prevChunk = chunk
callback()
} else {
this.#transform(this.#prevChunk, false, callback)
this.#prevChunk = chunk
}
}
#transform(chunk, isLast, callback) {
if (isLast) chunk[0] = 0xFF
this.push(chunk) // no change
callback()
}
_flush(callback) {
console.log('flush')
this.#transform(this.#prevChunk, true, callback)
}
// if source did not end our transform
#onUnpipe() {
if (!this.writableEnded) {
console.log('unpipe')
if (this.#endOnUnpipe) {
this.end() // will trigger _flush
} else {
this.#transform(this.#prevChunk, true, () => {})
}
}
}
}
const
source = new Source(),
transform = new Transform(),
output = new Output()
transform.pipe(output)
source.pipe(transform, {
// end: false // whether to end transform or not at end of source
})
Usage notes: This works fine unless the stream can suddenly pause for a long time, then the transform of the last chunk will be delayed until it ends or more data is received. So be aware of this if that could be a problem for you.