I am trying to stream data from a large csv file into readline. I tried just piping the readStream from s3 into the readline input, however I faced an error with S3 only allowing a connection to stay open for a certain amount of time.
I am creating the stream from s3 like so:
import * as AWS from 'aws-sdk';
import {s3Env} from '../config';
export default async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const params = {
Bucket: s3Env.bucket,
Key: s3Env.key
};
try {
const s3 = new AWS.S3({
accessKeyId: s3Env.accessKey,
secretAccessKey: s3Env.secret
});
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error
};
const stream = s3.getObject(params).createReadStream();
resolve(stream);
})
} catch (error) {
reject(error);
}
})
}
Then I am piping it into readline:
import * as readline from 'readline';
import createAWSStream from './createAWSStream';
export const readCSVFile = async function(): Promise<void> {
const rStream = await createAWSStream();
const lineReader = readline.createInterface({
input: rStream
});
for await (const line of lineReader) {
// process line
}
}
I found that the timeout for s3 connections was set at 120000ms (2 min). I tried simply raising the timeout, however I ran into more timeout issues from the HTTPS connection.
How can I stream data from AWS S3 the right way without setting a bunch of timeouts to some extremely large timeframe?
I am trying to stream data from a large csv file into readline. I tried just piping the readStream from s3 into the readline input, however I faced an error with S3 only allowing a connection to stay open for a certain amount of time.
I am creating the stream from s3 like so:
import * as AWS from 'aws-sdk';
import {s3Env} from '../config';
export default async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const params = {
Bucket: s3Env.bucket,
Key: s3Env.key
};
try {
const s3 = new AWS.S3({
accessKeyId: s3Env.accessKey,
secretAccessKey: s3Env.secret
});
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error
};
const stream = s3.getObject(params).createReadStream();
resolve(stream);
})
} catch (error) {
reject(error);
}
})
}
Then I am piping it into readline:
import * as readline from 'readline';
import createAWSStream from './createAWSStream';
export const readCSVFile = async function(): Promise<void> {
const rStream = await createAWSStream();
const lineReader = readline.createInterface({
input: rStream
});
for await (const line of lineReader) {
// process line
}
}
I found that the timeout for s3 connections was set at 120000ms (2 min). I tried simply raising the timeout, however I ran into more timeout issues from the HTTPS connection.
How can I stream data from AWS S3 the right way without setting a bunch of timeouts to some extremely large timeframe?
Share Improve this question edited May 24, 2022 at 1:40 about14sheep asked Jan 7, 2022 at 18:01 about14sheepabout14sheep 2,0392 gold badges13 silver badges19 bronze badges1 Answer
Reset to default 2I was able to work in a solution for this using the AWS S3 Range
property and creating a custom readable stream with NodeJS Stream API.
By using this "smart stream" I was able to grab data in chunks in separate requests to the S3 instance. By grabbing the data in chunks, I avoided any timeout errors as well as creating a more efficient stream. The NodeJS Readable Super class handles the buffer so as to not overload the input to readline. It also automatically handles the pausing and resuming of the stream.
This class made it possible to stream large files from AWS S3 very easily:
import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";
export class SmartStream extends Readable {
_currentCursorPosition = 0; // Holds the current starting position for our range queries
_s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
_maxContentLength: number; // Total number of bites in the file
_s3: S3; // AWS.S3 instance
_s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method
constructor(
parameters: S3.GetObjectRequest,
s3: S3,
maxLength: number,
// You can pass any ReadableStream options to the NodeJS Readable super class here
// For this example we wont use this, however I left it in to be more robust
nodeReadableStreamOptions?: ReadableOptions
) {
super(nodeReadableStreamOptions);
this._maxContentLength = maxLength;
this._s3 = s3;
this._s3StreamParams = parameters;
}
_read() {
if (this._currentCursorPosition > this._maxContentLength) {
// If the current position is greater than the amount of bytes in the file
// We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
this.push(null);
} else {
// Calculate the range of bytes we want to grab
const range = this._currentCursorPosition + this._s3DataRange;
// If the range is greater than the total number of bytes in the file
// We adjust the range to grab the remaining bytes of data
const adjustedRange =
range < this._maxContentLength ? range : this._maxContentLength;
// Set the Range property on our s3 stream parameters
this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
// Update the current range beginning for the next go
this._currentCursorPosition = adjustedRange + 1;
// Grab the range of bytes from the file
this._s3.getObject(this._s3StreamParams, (error, data) => {
if (error) {
// If we encounter an error grabbing the bytes
// We destroy the stream, NodeJS ReadableStream will emit the 'error' event
this.destroy(error);
} else {
// We push the data into the stream buffer
this.push(data.Body);
}
});
}
}
}
To work it into the createAWSStream
function I simply replaced the line where I created the readStream:
const stream = s3.getObject(params).createReadStream();
To instead create an instance of my SmartStream
class passing in the s3 params object, the s3 instance, and the content length of the data.
const stream = new SmartStream(params, s3, data.ContentLength);