I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that es through the ReadableStreams goes directly into the WritableStream right then.
I can do the opposite, by using ReadableStream.prototype.tee()
to split one ReadableStream into two, but I do not know how to bine two into one.
const textarea = document.querySelector("textarea");
// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
const sayMom = () => controller.enqueue("Mom! ");
setInterval(sayMom, 1000);
}});
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
const sayLois = () => controller.enqueue("Lois! ");
setInterval(sayLois, 700);
}});
// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>
I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that es through the ReadableStreams goes directly into the WritableStream right then.
I can do the opposite, by using ReadableStream.prototype.tee()
to split one ReadableStream into two, but I do not know how to bine two into one.
const textarea = document.querySelector("textarea");
// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
const sayMom = () => controller.enqueue("Mom! ");
setInterval(sayMom, 1000);
}});
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
const sayLois = () => controller.enqueue("Lois! ");
setInterval(sayLois, 700);
}});
// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>
Share
Improve this question
edited Apr 15, 2023 at 4:20
Ry-♦
226k56 gold badges493 silver badges499 bronze badges
asked Aug 11, 2022 at 1:30
baNaNabaNaNa
8025 silver badges24 bronze badges
2 Answers
Reset to default 8Manually, by racing the most recent read from each reader to produce the overall read and initiating those reads as necessary:
const never = new Promise(() => {});
const mergeStreams = streams => {
const readers = streams.map(s => s.getReader());
const reads = streams.map(() => null);
const dones = [];
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
dones.push(resolve);
})));
return new ReadableStream({
start: controller => {
allDone.then(() => {
controller.close();
});
},
pull: controller =>
Promise.race(
readers.map((r, i) =>
reads[i] ??= r.read().then(({value, done}) => {
if (done) {
dones[i]();
return never;
}
controller.enqueue(value);
reads[i] = null;
})
)
),
cancel: reason => {
for (const reader of readers) {
reader.cancel(reason);
}
},
});
};
const textarea = document.querySelector("textarea");
const never = new Promise(() => {});
const mergeStreams = streams => {
const readers = streams.map(s => s.getReader());
const reads = streams.map(() => null);
const dones = [];
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
dones.push(resolve);
})));
return new ReadableStream({
start: controller => {
allDone.then(() => {
controller.close();
});
},
pull: controller =>
Promise.race(
readers.map((r, i) =>
reads[i] ??= r.read().then(({value, done}) => {
if (done) {
dones[i]();
return never;
}
controller.enqueue(value);
reads[i] = null;
})
)
),
cancel: reason => {
for (const reader of readers) {
reader.cancel(reason);
}
},
});
};
// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
const sayMom = () => controller.enqueue("Mom! ");
setInterval(sayMom, 1000);
}});
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
const sayLois = () => controller.enqueue("Lois! ");
setInterval(sayLois, 700);
}});
// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
mergeStreams([
momReadableStream,
loisReadableStream,
]).pipeTo(writableStream).catch(console.error);
<textarea readonly></textarea>
You may create a MergingTransformStream
class. Unlike a standard TransformStream
the MergingTransformStream
has 2 WritableStream
s and a ReadableStream
.
The following code is a matchstick which doesn't cover error handling, edge cases etc however it may give you an idea.
Things to pay attention:
- We are using arrows in the option methods to capture the
this
ofMergingTransformStream
properly. - This is not a production code. No errors etc handled.
- 2 source ReadableStreams
rs
andqs
produce positive and negative numbers respectively every 10ms. - You can only stop the
.pipeTo()
ed WebStreaming API chains byAbortController()
unless the readablestream parts terminate themselves by some means like error. We have to.abort()
them in turn. - The final writestream (
ws
) logs the merged data. - No plains on indenting please :)
class MergingTransformStream {
constructor(){
this.readable = new ReadableStream({ start: controller => {
this.writableA = new WritableStream({ write: chunk => controller.enqueue(chunk)
, abort: r => ( console.log(`Nullified writeableA part of MTS due to ${r}`)
, this.writableA = null
)
});
this.writableB = new WritableStream({ write: chunk => controller.enqueue(chunk)
, abort: r => ( console.log(`Nullified writeableB part of MTS due to ${r}`)
, this.writableB = null
)
});
}
, cancel: r => {
console.log(`Nullified the readable part of MTS due to ${r}`);
this.readable = null;
}
})
}
pipeTo(...args){
return this.readable.pipeTo(...args);
}
}
const rs = new ReadableStream({ pull(controller){ controller.enqueue((Math.random()*1000).toFixed(2));
return new Promise(v => setTimeout( v
, 10
, true
))}
, cancel(r){console.log(`rs closed due to ${r}`)}
}),
qs = new ReadableStream({ pull(controller){ controller.enqueue((Math.random()*-1000).toFixed(2));
return new Promise(v => setTimeout( v
, 10
, true
))}
, cancel(r){console.log(`qs closed due to ${r}`)}
}),
ms = new MergingTransformStream(),
ws = new WritableStream({ write(chunk){
console.log("downstream:",chunk)
}
, abort(r){
console.log(`ws aborted due to ${r}`);
}
}),
cA = new AbortController(),
cB = new AbortController(),
cC = new AbortController();
rs.pipeTo(ms.writableA, {signal: cA.signal})
.catch(e => console.log("A channel aborted"));
qs.pipeTo(ms.writableB, {signal: cB.signal})
.catch(e => console.log("B channel aborted"));
ms.pipeTo(ws, {signal: cC.signal})
.catch(e => console.log(`MTS wiped out pletely due to ${e}`));
setTimeout( _ => ( cA.abort("abort stream A")
, cB.abort("abort stream B")
, cC.abort("abort downstream")
)
, 1000
);