最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

amazon web services - Kinesis consumer region switching - Stack Overflow

programmeradmin6浏览0评论

We have a lambda replicator function replicating the data between two kinesis streams. And the consumer running in US West is using the Spring Cloud Stream Binder KCL-enabled consumer. And we want to switch the region if there is some issue on the US West. So we are trying to consume the messages using the checkpoint DynamoDB table from the other region. And we are currently getting an error saying that the DynamoDB, the sequence number available in the DynamoDB global table is not from the stream in the US East region while consuming the message from the US East region.

So how to resolve this? How can we mirror the messages including the sequence numbers across regions? Is that possible? If it is not possible, what is the best solution to do this, to achieve region switching without any issues while using Kinesis?

We have a lambda replicator function replicating the data between two kinesis streams. And the consumer running in US West is using the Spring Cloud Stream Binder KCL-enabled consumer. And we want to switch the region if there is some issue on the US West. So we are trying to consume the messages using the checkpoint DynamoDB table from the other region. And we are currently getting an error saying that the DynamoDB, the sequence number available in the DynamoDB global table is not from the stream in the US East region while consuming the message from the US East region.

So how to resolve this? How can we mirror the messages including the sequence numbers across regions? Is that possible? If it is not possible, what is the best solution to do this, to achieve region switching without any issues while using Kinesis?

Share Improve this question edited Mar 23 at 17:15 desertnaut 60.5k32 gold badges155 silver badges181 bronze badges asked Mar 20 at 13:13 Rajesh Babu DevabhaktuniRajesh Babu Devabhaktuni 291 silver badge9 bronze badges
Add a comment  | 

3 Answers 3

Reset to default 0

This replicator lambda function helps with region switching.


import json
import boto3
import base64
import os
from decimal import Decimal

# AWS Clients
source_region = os.environ.get("SOURCE_REGION", "us-west-2")
destination_region = os.environ.get("DESTINATION_REGION", "us-east-1")

kinesis_source = boto3.client('kinesis', region_name=source_region)
kinesis_destination = boto3.client('kinesis', region_name=destination_region)
dynamodb = boto3.client('dynamodb', region_name=source_region)

# Environment Variables
SOURCE_STREAM = os.environ.get("SOURCE_STREAM", "source-kinesis-stream")
DESTINATION_STREAM = os.environ.get("DESTINATION_STREAM", "destination-kinesis-stream")
DYNAMODB_TABLE = os.environ.get("DYNAMODB_TABLE", "kinesis_checkpoint_table")

def get_checkpoint(shard_id):
    """Retrieve last processed timestamp from DynamoDB."""
    response = dynamodb.get_item(
        TableName=DYNAMODB_TABLE,
        Key={'shard_id': {'S': shard_id}}
    )
    return float(response.get('Item', {}).get('timestamp', {}).get('N', 0))  # Default: 0 (epoch)

def update_checkpoint(shard_id, timestamp):
    """Update last processed timestamp in DynamoDB."""
    dynamodb.put_item(
        TableName=DYNAMODB_TABLE,
        Item={
            'shard_id': {'S': shard_id},
            'timestamp': {'N': str(Decimal(timestamp))}  # Store timestamp as a number
        }
    )

def lambda_handler(event, context):
    """AWS Lambda function to replicate Kinesis records with timestamp-based checkpointing."""
    records = event.get('Records', [])
    if not records:
        print("No records to process")
        return {"statusCode": 200, "message": "No records found"}

    shard_id = records[0]['eventSourceARN'].split(":")[-1]  # Extract Shard ID
    last_checkpoint = get_checkpoint(shard_id)

    records_to_replicate = []
    latest_timestamp = None

    for record in records:
        try:
            record_timestamp = float(record['kinesis']['approximateArrivalTimestamp'])

            # Skip already processed records
            if last_checkpoint and record_timestamp <= last_checkpoint:
                continue

            decoded_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            record_json = json.loads(decoded_data)

            records_to_replicate.append({
                'Data': json.dumps(record_json),
                'PartitionKey': record['kinesis']['partitionKey']
            })

            latest_timestamp = record_timestamp  # Track latest timestamp

        except Exception as e:
            print(f"Error processing record: {e}")

    # Write records to destination Kinesis stream
    if records_to_replicate:
        response = kinesis_destination.put_records(
            StreamName=DESTINATION_STREAM,
            Records=records_to_replicate
        )
        print(f"Replicated {len(records_to_replicate)} records: {response}")

    # Update checkpoint in DynamoDB
    if latest_timestamp:
        update_checkpoint(shard_id, latest_timestamp)

    return {"statusCode": 200, "message": "Replication with timestamp checkpointing successful"}
import .springframework.cloud.stream.binder.kinesis.listener.CheckpointerAwareMessageHandler;
import .springframework.stereotype.Component;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;

@Component
public class KinesisCheckpointerHandler extends CheckpointerAwareMessageHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisCheckpointerHandler.class);

    public KinesisCheckpointerHandler() {
        super();
    }

    @Override
    protected void onMessage(Object payload, ShardRecordProcessorCheckpointer checkpointer) {
        LOGGER.info("Processing message: {}", payload);

        try {
            checkpointer.checkpoint();
            LOGGER.info("Checkpoint successful.");
        } catch (Exception e) {
            LOGGER.error("Checkpoint failed: {}", e.getMessage(), e);
        }
    }
}
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;
import software.amazon.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import software.amazon.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.kinesismon.InitialPositionInStream;
import software.amazon.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import software.amazon.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;

@Configuration
public class KinesisConfig {

    @Bean
    public Worker kinesisWorker() {
        String streamName = "my-kinesis-stream";
        String consumerGroup = "my-consumer-group";
        
        // Setting the initial position to "LATEST" or "AT_TIMESTAMP"
        InitialPositionInStreamExtended initialPosition = 
            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);

        // Alternatively, use AT_TIMESTAMP if you have a timestamp:
        // Instant timestamp = Instant.parse("2025-04-07T12:00:00Z");
        // InitialPositionInStreamExtended initialPosition =
        //     InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);

        KinesisClientLibConfiguration kinesisClientLibConfiguration = 
            new KinesisClientLibConfiguration(
                consumerGroup,
                streamName,
                new DefaultAWSCredentialsProviderChain(),
                new DefaultClientConfiguration()
            )
            .withInitialPositionInStreamExtended(initialPosition);

        IRecordProcessorFactory recordProcessorFactory = new MyRecordProcessorFactory();

        Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
        return worker;
    }
}
发布评论

评论列表(0)

  1. 暂无评论