最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

pipeline - How to pipe two ReadableStreams into one WritableStream in Javascript? - Stack Overflow

programmeradmin0浏览0评论

I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that es through the ReadableStreams goes directly into the WritableStream right then.

I can do the opposite, by using ReadableStream.prototype.tee() to split one ReadableStream into two, but I do not know how to bine two into one.

const textarea = document.querySelector("textarea");


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>

I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that es through the ReadableStreams goes directly into the WritableStream right then.

I can do the opposite, by using ReadableStream.prototype.tee() to split one ReadableStream into two, but I do not know how to bine two into one.

const textarea = document.querySelector("textarea");


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>

Share Improve this question edited Apr 15, 2023 at 4:20 Ry- 226k56 gold badges493 silver badges499 bronze badges asked Aug 11, 2022 at 1:30 baNaNabaNaNa 8025 silver badges24 bronze badges
Add a ment  | 

2 Answers 2

Reset to default 8

Manually, by racing the most recent read from each reader to produce the overall read and initiating those reads as necessary:

const never = new Promise(() => {});

const mergeStreams = streams => {
    const readers = streams.map(s => s.getReader());
    const reads = streams.map(() => null);
    const dones = [];
    const allDone = Promise.all(streams.map(s => new Promise(resolve => {
        dones.push(resolve);
    })));

    return new ReadableStream({
        start: controller => {
            allDone.then(() => {
                controller.close();
            });
        },
        pull: controller =>
            Promise.race(
                readers.map((r, i) =>
                    reads[i] ??= r.read().then(({value, done}) => {
                        if (done) {
                            dones[i]();
                            return never;
                        }

                        controller.enqueue(value);
                        reads[i] = null;
                    })
                )
            ),
        cancel: reason => {
            for (const reader of readers) {
                reader.cancel(reason);
            }
        },
    });
};

const textarea = document.querySelector("textarea");


const never = new Promise(() => {});

const mergeStreams = streams => {
    const readers = streams.map(s => s.getReader());
    const reads = streams.map(() => null);
    const dones = [];
    const allDone = Promise.all(streams.map(s => new Promise(resolve => {
        dones.push(resolve);
    })));

    return new ReadableStream({
        start: controller => {
            allDone.then(() => {
                controller.close();
            });
        },
        pull: controller =>
            Promise.race(
                readers.map((r, i) =>
                    reads[i] ??= r.read().then(({value, done}) => {
                        if (done) {
                            dones[i]();
                            return never;
                        }

                        controller.enqueue(value);
                        reads[i] = null;
                    })
                )
            ),
        cancel: reason => {
            for (const reader of readers) {
                reader.cancel(reason);
            }
        },
    });
};


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


mergeStreams([
  momReadableStream,
  loisReadableStream,
]).pipeTo(writableStream).catch(console.error);
<textarea readonly></textarea>

You may create a MergingTransformStream class. Unlike a standard TransformStream the MergingTransformStream has 2 WritableStreams and a ReadableStream. The following code is a matchstick which doesn't cover error handling, edge cases etc however it may give you an idea.

Things to pay attention:

  1. We are using arrows in the option methods to capture the this of MergingTransformStream properly.
  2. This is not a production code. No errors etc handled.
  3. 2 source ReadableStreams rs and qs produce positive and negative numbers respectively every 10ms.
  4. You can only stop the .pipeTo()ed WebStreaming API chains by AbortController() unless the readablestream parts terminate themselves by some means like error. We have to .abort() them in turn.
  5. The final writestream (ws) logs the merged data.
  6. No plains on indenting please :)

class MergingTransformStream {
  constructor(){
    this.readable = new ReadableStream({ start: controller => {
                                           this.writableA = new WritableStream({ write: chunk => controller.enqueue(chunk)
                                                                               , abort: r => ( console.log(`Nullified writeableA part of MTS due to ${r}`)
                                                                                             , this.writableA = null
                                                                                             )
                                                                               });
                                           this.writableB = new WritableStream({ write: chunk => controller.enqueue(chunk)
                                                                               , abort: r => ( console.log(`Nullified writeableB part of MTS due to ${r}`)
                                                                                             , this.writableB = null
                                                                                             )
                                                                               });
                                         }
                                       , cancel: r => {
                                           console.log(`Nullified the readable part of MTS due to ${r}`);
                                           this.readable = null;
                                         }
                                       })
    }
  pipeTo(...args){
    return this.readable.pipeTo(...args);
  }
}
const rs = new ReadableStream({ pull(controller){ controller.enqueue((Math.random()*1000).toFixed(2));
                                                  return new Promise(v => setTimeout( v
                                                                                    , 10
                                                                                    , true
                                                                                    ))}
                              , cancel(r){console.log(`rs closed due to ${r}`)}
                              }),

      qs = new ReadableStream({ pull(controller){ controller.enqueue((Math.random()*-1000).toFixed(2));
                                                  return new Promise(v => setTimeout( v
                                                                                    , 10
                                                                                    , true
                                                                                    ))}
                              , cancel(r){console.log(`qs closed due to ${r}`)}
                              }),
      ms = new MergingTransformStream(),
      ws = new WritableStream({ write(chunk){
                                  console.log("downstream:",chunk)
                                }
                              , abort(r){
                                  console.log(`ws aborted due to ${r}`);
                                }
                              }),
      cA = new AbortController(),
      cB = new AbortController(),
      cC = new AbortController();

rs.pipeTo(ms.writableA, {signal: cA.signal})
  .catch(e => console.log("A channel aborted"));
qs.pipeTo(ms.writableB, {signal: cB.signal})
  .catch(e => console.log("B channel aborted"));
ms.pipeTo(ws, {signal: cC.signal})
  .catch(e => console.log(`MTS wiped out pletely due to ${e}`));
setTimeout( _ => ( cA.abort("abort stream A")
                 , cB.abort("abort stream B")
                 , cC.abort("abort downstream")
                 )
          , 1000
          );

发布评论

评论列表(0)

  1. 暂无评论