最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

NodeJS Streams

网站源码admin33浏览0评论

NodeJS Streams

NodeJS Streams

我有以下问题:

我在管道中使用 NodeJS 流来读取大文件,进行一些转换,然后将其写入可写流。棘手的部分是我希望能够在满足特定条件时停止读取文件,但仍然完成处理已经读取的块。

在下面的示例中,我正在读取一个文本文件,我的转换流(secondStream)将文本转换为大写并发送到下一个流。但是如果它找到一个文本,那就意味着它应该停止从文本文件中读取,我认为这意味着读取流应该停止读取块。

我尝试了几种解决方案,但不会说我在这里有点困惑。到目前为止,我得到了以下代码。但是,通过使用

firstStream.destroy();
使管道抛出错误

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close

我能够通过在管道上捕获并忽略它来“避免”这个错误,但老实说,这对我来说听起来不安全或不正确。

  const { Transform, Writable, Readable } = require("node:stream");
  const { pipeline } = require("node:stream/promises");
  const fs = require("node:fs");

  let shouldStop = false;
  const firstStream = fs.createReadStream("./lg.txt");

  const secondStream = new Transform({
    transform(chunk, encoding, callback) {
      const foundText = chunk.toString().search("CHAPTER 9") !== -1;

      if (foundText) {
        shouldStop = true;
      }

      const transformed = chunk.toString().toUpperCase();
      callback(null, transformed);
    },
  });

  const lastStream = process.stdout;

  firstStream.on("data", () => {
    if (shouldStop) {
      console.log("should pause");
      firstStream.destroy();
    }
  });

  await pipeline(firstStream, secondStream, lastStream).catch(
    (err) => undefined
  ); // Feels wrong to me

有什么更好的方法吗?我错过了什么吗?

提前谢谢各位朋友!

回答如下:

在您的转换流中,您可以“吃掉”或“跳过”找到目标文本之后的任何数据。这样,你可以保留所有其他

pipeline()
逻辑。它不会立即终止,而是只会读取到输入流的末尾,但会跳过目标文本之后的所有数据。这允许流正常完成。

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        if (shouldStop) {
            // eat any remaining data
            callback(null, "");
        } else {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
            if (foundText) {
                // set flag to eat remaining data
                shouldStop = true;
            }
            callback(null, text.toUpperCase());
        }
    },
});

pipeline()
函数还支持中止控制器,这是一种中止管道的支持方法,同时仍然适当地清理所有内容。当你中止时,
pipeline()
将以拒绝的承诺结束,但你可以检查拒绝是否是因为你的中止,如果是,你可以得到你的中止消息。

在您的代码中,可以这样实现:

const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

const firstStream = fs.createReadStream("./lg.txt");

const ac = new AbortController();
const signal = ac.signal;

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        const text = chunk.toString();
        const foundText = text.search("CHAPTER 9") !== -1;

        callback(null, text.toUpperCase());
        if (foundText) {
            ac.abort(new Error("reading terminated, match found"));
        }

    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
    console.log("\nall done without match");
}).catch((err) => {
    if (err.code === "ABORT_ERR") {
        console.log(`\n${signal.reason.message}`);
    } else {
        console.log(err);
    }
});

注意: 在另一个主题上,您的代码容易受到跨块边界的搜索字符串的攻击,因此不会被检测到。避免该问题的通常方法是在运行匹配搜索之前保留每个块的最后 N 个字符并将其添加到下一个块之前,其中 N 是搜索字符串的长度 - 1。这确保您不会错过搜索跨越块的字符串。您将不得不调整输出以不包含前置文本。由于这不是您问题的症结所在,我没有添加该逻辑并将其留给您,但这是可靠匹配所必需的。

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论