I am polling 1000 records from broker using spring kafka. I am using the default consumer/listener factory beans provided by auto configuration.
@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>> consumerRecords)
When spring processes above listener, it sends the whole batch of polled records i.e. 1000 records in one go. That is, in the above, consumerRecords size will be 1000. When an exception is thrown during processing in the above processRecords
listener, spring kafka retries processing the whole batch i.e. 1000 records.
So i set the ackMode as below
spring.kafka.listener.ack-mode=manual_immediate
and am acknowledged records after processing i.e acknowledgment.acknowledge(counter);
if(counter % OFFSET_COMMIT_BATCH_SIZE == 0 && counter<consumerRecords.size()-1) {
log.info("Processed {} records", counter);
acknowledgment.acknowledge(counter); //in multi partition topic, how this works should be tested.
}
But even then, when a exception is throw, spring kafka is retrying the whole batch (as it does not poll the broker for the same 1000 records once again, i assume the container is holding those 1000 records in it's memory so it can retry with out polling).
I am using the default auto-configuration provided listener, container, error handler beans. And I see from logs that DefaultErrorHandler is handling the exception.
Is there anyway I can ask spring-kafka to not retry the already acknowledged/committed messages.
I am polling 1000 records from broker using spring kafka. I am using the default consumer/listener factory beans provided by auto configuration.
@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>> consumerRecords)
When spring processes above listener, it sends the whole batch of polled records i.e. 1000 records in one go. That is, in the above, consumerRecords size will be 1000. When an exception is thrown during processing in the above processRecords
listener, spring kafka retries processing the whole batch i.e. 1000 records.
So i set the ackMode as below
spring.kafka.listener.ack-mode=manual_immediate
and am acknowledged records after processing i.e acknowledgment.acknowledge(counter);
if(counter % OFFSET_COMMIT_BATCH_SIZE == 0 && counter<consumerRecords.size()-1) {
log.info("Processed {} records", counter);
acknowledgment.acknowledge(counter); //in multi partition topic, how this works should be tested.
}
But even then, when a exception is throw, spring kafka is retrying the whole batch (as it does not poll the broker for the same 1000 records once again, i assume the container is holding those 1000 records in it's memory so it can retry with out polling).
I am using the default auto-configuration provided listener, container, error handler beans. And I see from logs that DefaultErrorHandler is handling the exception.
Is there anyway I can ask spring-kafka to not retry the already acknowledged/committed messages.
Share Improve this question asked Feb 14 at 17:51 cbotcbot 1628 bronze badges 3 |1 Answer
Reset to default -1You can approach this by processing a smaller batch, instead of processing all 1000 records at once, we break them into smaller batches (100 records each) using Guava's Lists.partition()
.
This will ensure the acknowledgement after each successful batch.
application.properties
:
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=1000
//make your adjasments as needed
spring.kafka.consumer.fetch.min.bytes=1
spring.kafka.consumer.fetch.max.wait.ms=500
spring.kafka.listener.type=BATCH
Your implementation :
@Service
public class KafkaConsumerService {
private static final int BATCH_SIZE = 100; // Process records in
smaller chunks
private final KafkaTemplate<String, Word> kafkaTemplate;
@KafkaListener(topics = "abcd-topic")
public void processRecords(List<ConsumerRecord<String, Word>>
records,
Acknowledgment acknowledgment) {
List<List<ConsumerRecord<String, Word>>> batches =
Lists.partition(records, BATCH_SIZE);
int processedCount = 0;
for (List<ConsumerRecord<String, Word>> batch : batches) {
try {
processBatch(batch);
processedCount += batch.size();
// Acknowledge after each successful batch
acknowledgment.acknowledge();
log.info("Processed and acknowledged {} records",
processedCount);
} catch (Exception e) {
log.error("Error processing batch starting at offset
{}",
batch.get(0).offset(), e);
// Only retry from the failed batch onwards
handleFailedBatch(records.subList(processedCount,
records.size()));
break;
}
}
}
You also need to add exception handling to your batches (what you want to do when it fails).
spring.kafka.listener.ack-mode=manual
. I am asking forspring.kafka.listener.ack-mode=manual_immediate
– cbot Commented 2 days ago