最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

spring kafka how to avoid retrying the whole batch in case of exceptions? - Stack Overflow

programmeradmin3浏览0评论

I am polling 1000 records from broker using spring kafka. I am using the default consumer/listener factory beans provided by auto configuration.

@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>> consumerRecords)

When spring processes above listener, it sends the whole batch of polled records i.e. 1000 records in one go. That is, in the above, consumerRecords size will be 1000. When an exception is thrown during processing in the above processRecords listener, spring kafka retries processing the whole batch i.e. 1000 records.

So i set the ackMode as below
spring.kafka.listener.ack-mode=manual_immediate
and am acknowledged records after processing i.e acknowledgment.acknowledge(counter);

if(counter % OFFSET_COMMIT_BATCH_SIZE == 0 && counter<consumerRecords.size()-1) {
   log.info("Processed {} records", counter);
   acknowledgment.acknowledge(counter);  //in multi partition topic, how this works should be tested.
}

But even then, when a exception is throw, spring kafka is retrying the whole batch (as it does not poll the broker for the same 1000 records once again, i assume the container is holding those 1000 records in it's memory so it can retry with out polling).

I am using the default auto-configuration provided listener, container, error handler beans. And I see from logs that DefaultErrorHandler is handling the exception.

Is there anyway I can ask spring-kafka to not retry the already acknowledged/committed messages.

I am polling 1000 records from broker using spring kafka. I am using the default consumer/listener factory beans provided by auto configuration.

@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>> consumerRecords)

When spring processes above listener, it sends the whole batch of polled records i.e. 1000 records in one go. That is, in the above, consumerRecords size will be 1000. When an exception is thrown during processing in the above processRecords listener, spring kafka retries processing the whole batch i.e. 1000 records.

So i set the ackMode as below
spring.kafka.listener.ack-mode=manual_immediate
and am acknowledged records after processing i.e acknowledgment.acknowledge(counter);

if(counter % OFFSET_COMMIT_BATCH_SIZE == 0 && counter<consumerRecords.size()-1) {
   log.info("Processed {} records", counter);
   acknowledgment.acknowledge(counter);  //in multi partition topic, how this works should be tested.
}

But even then, when a exception is throw, spring kafka is retrying the whole batch (as it does not poll the broker for the same 1000 records once again, i assume the container is holding those 1000 records in it's memory so it can retry with out polling).

I am using the default auto-configuration provided listener, container, error handler beans. And I see from logs that DefaultErrorHandler is handling the exception.

Is there anyway I can ask spring-kafka to not retry the already acknowledged/committed messages.

Share Improve this question asked Feb 14 at 17:51 cbotcbot 1628 bronze badges 3
  • Even though you are doing acknowledgment manually, kafka only commit the offset when the entire batch is successfully processed. Batch is considered as a single unit. – rahulP Commented 2 days ago
  • @rahulP, plz check, what you are saying should be the case for spring.kafka.listener.ack-mode=manual. I am asking for spring.kafka.listener.ack-mode=manual_immediate – cbot Commented 2 days ago
  • check this out. this might help github/spring-projects/spring-kafka/issues/118 – rahulP Commented 2 days ago
Add a comment  | 

1 Answer 1

Reset to default -1

You can approach this by processing a smaller batch, instead of processing all 1000 records at once, we break them into smaller batches (100 records each) using Guava's Lists.partition().

This will ensure the acknowledgement after each successful batch.

application.properties:

 spring.kafka.listener.ack-mode=MANUAL
 spring.kafka.consumer.enable-auto-commit=false
 spring.kafka.consumer.max-poll-records=1000
 //make your adjasments as needed 
 spring.kafka.consumer.fetch.min.bytes=1
 spring.kafka.consumer.fetch.max.wait.ms=500
 spring.kafka.listener.type=BATCH

Your implementation :

@Service
public class KafkaConsumerService {
private static final int BATCH_SIZE = 100; // Process records in 
smaller chunks
private final KafkaTemplate<String, Word> kafkaTemplate;

@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>> 
records, 
                         Acknowledgment acknowledgment) {
    List<List<ConsumerRecord<String, Word>>> batches = 
        Lists.partition(records, BATCH_SIZE);
    
    int processedCount = 0;
    
    for (List<ConsumerRecord<String, Word>> batch : batches) {
        try {
            processBatch(batch);
            processedCount += batch.size();
            // Acknowledge after each successful batch
            acknowledgment.acknowledge();
            log.info("Processed and acknowledged {} records", 
     processedCount);
        } catch (Exception e) {
            log.error("Error processing batch starting at offset 
     {}", 
                batch.get(0).offset(), e);
            // Only retry from the failed batch onwards
            handleFailedBatch(records.subList(processedCount, 
   records.size()));
            break;
        }
    }
}

You also need to add exception handling to your batches (what you want to do when it fails).

发布评论

评论列表(0)

  1. 暂无评论