最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

在 nodejs 中使用 readline 获取文件的第二行?

网站源码admin40浏览0评论

在 nodejs 中使用 readline 获取文件的第二行?

在 nodejs 中使用 readline 获取文件的第二行?

一个简单的文件拆分器。 for 循环中写入的第一行被写入 writestream。在“for await readline”的后续迭代中写入的行不是。你如何写后续行?

const tempWriteStream =  (temporary: string): WriteStream => {
  console.log(`Temporary file is ${temporary}`);
  return fs.createWriteStream(temporary);
}

const fileSplitter = async (sourceBucket: GetObjectCommandInput) => {
    const data = await s3Client.send(new GetObjectCommand(sourceBucket));
    const rl = readline.createInterface({
    input: data.Body,
    crlfDelay: Infinity
  });

  let count = 0
  let header = ""
  let temporary = createTemporaryFile();
  let writeStream = await tempWriteStream(temporary)

  for await (const line of rl) {
    if (count === 0) {
        header = line;
    }
    count++;
    // Handle drain
    const writeCanContinue = writeStream.write(line)
    if (count > limit || !writeCanContinue) {
        console.log('Starting a new file')
        writeStream.end()
        handleCreateNewFile()
    }
  }

  await writeStream.end(async () => {
  })
  await writeStream.on('finish', async () =>{
    await s3Put(temporary, sourceBucket)
  })
// data.Body is a readable stream
}
回答如下:

正如其他人所说,不要使用readline。我也放弃了 writeStream。这是整个 lambda。 AWS javascript SDK 3.

import 'source-map-support/register';
import {s3Client} from "@api/clients/s3Client";
import {GetObjectCommand, GetObjectCommandInput, PutObjectCommand, PutObjectCommandOutput} from "@aws-sdk/client-s3";
import * as fs from "fs";
import path from "path";
import {winnerAppConfig} from "@api/config/app-config";
import mktemp from "mktemp";
import * as os from "os";
import {sdkStreamMixin} from "@aws-sdk/util-stream-node";

const limit = 2000;


const s3Put = async (temporary: string, sourceBucket: 
  GetObjectCommandInput): Promise<PutObjectCommandOutput> => {
   const fileName = path.parse(temporary).name;
   console.log(`Filename is ${fileName}`);
   console.log(JSON.stringify(path.parse(temporary)));
   try {
        const stats = fs.statSync(temporary);
        console.log(`Stats are ${JSON.stringify(stats)}`)
    } catch (err) {
        console.error(err);
    }

    const fileStream = fs.createReadStream(temporary);
    let targetS3Name = sourceBucket.Key
    if (targetS3Name.includes('.')) {
        const fileKeyNoSuffixParts = sourceBucket.Key.split('.')
        fileKeyNoSuffixParts.pop()
        targetS3Name = fileKeyNoSuffixParts.join('.')
    }

    const fileKeyNoSuffix = targetS3Name + `-${fileName}.csv`

    const putObject: PutObjectCommand = new PutObjectCommand({
        Bucket: winnerAppConfig.processingBucket,
        Key: fileKeyNoSuffix,
        Body: fileStream
    });

   try {
      return await s3Client.send(putObject);
   } catch (err) {
    console.log('Error');
    console.log(`Error putting object ${putObject.input.Key}`, err);
   }
}
const createTemporaryFile = () => {
    try {
        return mktemp.createFileSync('/tmp/XXXX.csv');
    } catch (err) {
        console.error(`Could not create the temporary file ${err}`)
        throw err;
    }
}
const fileSplitter = async (sourceBucket: GetObjectCommandInput) => {
const data = await s3Client.send(new GetObjectCommand(sourceBucket));

let count = 0
let header = ""
let remaining = ''
const s3Promises = []
let buffer = "";
 const processRows = () => {
    const sdkStream = sdkStreamMixin(data.Body);

    sdkStream.on('data', (data) => {
        const lines = (remaining + data).split(/\r?\n/g);
        console.log(`Bytes long ${Buffer.byteLength(remaining + data)} overlap ${Buffer.byteLength(remaining + data) % 4096}`)
        remaining = lines.pop()
        if ((header === "" || header === os.EOL) && lines.length > 0) {
            header = lines[0] + os.EOL
            console.log(`Header is ${header}`)
        }
        // Count is not perfect, but near enough for performance splitting.
        count += lines.length
        buffer += lines.filter(line => line.length > 0).join(os.EOL)
        if (buffer.length > 0) {
            buffer += os.EOL
        }

processRows()

// Read that s3 needs close rather than finish
// https://stackoverflow/questions/37837132/how-to-wait-for-a-stream-to-finish-piping-nodejs
const lastBlockSdkStream = sdkStreamMixin(data.Body);
const lastBlockPromise = new Promise(() => lastBlockSdkStream.on('close', async () => {
    console.log(`Close - writing file to the bucket`)
    buffer += remaining;
    const temporaryFile = createTemporaryFile()
    fs.writeFileSync(temporaryFile, buffer)
    await s3Put(temporaryFile, sourceBucket)
}))
s3Promises.push(lastBlockPromise)

return Promise.allSettled(s3Promises).then(() => {
    console.log('S3 done')
})

// data.Body is a readable stream
}

export const splitter = async (event, context): Promise<any => {
return new Promise(() => {
    const s3EventRecords = event.Records;
    console.log('Context is ' + JSON.stringify(context))
    console.log('s3EventRecords ' + JSON.stringify(s3EventRecords))

    const triggerBucket = s3EventRecords[0].s3.bucket.name;
    const newKey = s3EventRecords[0].s3.object.key;
    const keyDecoded = decodeURIComponent(newKey.replace(/\+/g, ' '));

    console.log(`Processing ${triggerBucket} : ${keyDecoded}`)
    const bucket: GetObjectCommandInput = {
        Bucket: triggerBucket,
        Key: keyDecoded
    };
    try {
        return fileSplitter(bucket);
    } catch (err) {
        console.log(`Error ${err}`);
        throw new Error(`Error getting object ${newKey} from bucket ${triggerBucket}`)
    }
})
}
export const main = splitter;
发布评论

评论列表(0)

  1. 暂无评论