最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

NodeJS进程在处理大量记录后挂起

网站源码admin39浏览0评论

NodeJS进程在处理大量记录后挂起

NodeJS进程在处理大量记录后挂起

我有一个带有以下代码片段的节点 JS 进程。我正在尝试在内存 (transactionObj) 中缓存大量交易(大约 45 万)。

 const transactionObj = {};
    const transactionCursor = target_db.collection("transaction").find();
     processTxns = 0; 
    for await (const transactionDoc of transactionCursor) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: "+processTxns)
    }

当我执行此代码时,进程在处理一定数量的记录后挂起,每次大约 16K 或 20k 不同。 我尝试使用 Map 但行为相同。我也试过传递论点

node --max-old-space-size=6144

但问题依然存在。此代码在具有 32GB RAM 的 Windows 服务器上运行。代码运行时我没有看到大量内存使用(低于 50%)

回答如下:

如果这些操作是内存密集型操作,您应该分块处理它们,一种选择是在查询中实现分页并获取有限数量的事务并进行处理。


const CHUNK_SIZE = 10;

async function* readChunksFromDB() {
  const numberOfTransactions = await target_db
    .collection("transaction")
    .count();
  const numberOfChunks = Math.ceil(numberOfTransactions / CHUNK_SIZE);

  for (let i = 0; i < numberOfChunks; i++) {
    const chunk = await target_db
      .collection("transaction")
      .find({ take: CHUNK_SIZE, skip: i * CHUNK_SIZE });

    yield chunk;
  }
}

async function processChunks() {
  const transactionObj = {};
  processTxns = 0;
  for await (const chunk of readChunksFromDB()) {
    for (const transactionDoc of chunk) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: " + processTxns);
    }
  }
}

或者使用具有自动背压机制的管道的 Streams。 Streams 是 NodeJS 推荐的用于管理大量操作数据的解决方案:

const { Readable, Transform } = require("stream");

const transactionsStream = Readable.from(readChunksFromDB());
const processTransactionStream = new Transform({
  transform(chunk, encoding, callback) {
    const transactions = JSON.parse(chunk.toString());
    console.log(transactions);
    transactions
      .map((transactionDoc) => {
        if (transactionDoc.orderId && transactionDoc.orderId[0]) {
          const orderIdString = String(transactionDoc.orderId[0].oid);
          if (!orderIdString) {
            return transactionDoc;
          }

          const existingDoc = transactionObj[orderIdString];
          if (!existingDoc || existingDoc._id < transactionDoc._id) {
            transactionObj[orderIdString] = transactionDoc;
          }

          return transactionDoc;
        }
      })
      .forEach((transaction) => {
        console.log(transaction);
        this.push(JSON.stringify(transaction));
      });

    callback();
  },
});

transactionsStream.pipe(processTransactionStream).on("data", (chunk) => {
    console.log(chunk.toString())
});

在这两种情况下,您都必须管理不同的阵列以进行重复数据删除,但您将在性能方面获得很多。

https://blog.appsignal/2022/02/02/use-streams-to-build-high-performing-nodejs-applications.html

发布评论

评论列表(0)

  1. 暂无评论