I am having trouble understanding how exactly the unparameterized get on Kafka's FutureRecordMetadata class works. I have a method in a service where I produce to a topic and want to return only when the broker ack's the record. I have configured my producer with the following properties:
kafka.linger-ms=5
kafka.acks=all
kafka.enable-idempotence-config=true
kafka.delivery-timeout-ms=25000
kafka.max-in-flight=5
kafka.retry-backoff-ms=1000
kafka.request-timeout-ms=20000
Here is the service method:
Future<RecordMetadata> result = this.someProducer.send(
new ProducerRecord<>(
this.someTopic,
null,
someDTO
)
);
try {
result.get();
return someDTO.getId();
} catch (ExecutionException e) {
throw new NotProducedException("Failed to produce : " + e.getMessage(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new NotProducedException("Interrupted while producing", e);
}
My question is not how to configure the get to wait for 25 seconds, but rather why do I have to wait for 60 seconds rather then the ones I have specified in my timeout to get a response?
I am having trouble understanding how exactly the unparameterized get on Kafka's FutureRecordMetadata class works. I have a method in a service where I produce to a topic and want to return only when the broker ack's the record. I have configured my producer with the following properties:
kafka.linger-ms=5
kafka.acks=all
kafka.enable-idempotence-config=true
kafka.delivery-timeout-ms=25000
kafka.max-in-flight=5
kafka.retry-backoff-ms=1000
kafka.request-timeout-ms=20000
Here is the service method:
Future<RecordMetadata> result = this.someProducer.send(
new ProducerRecord<>(
this.someTopic,
null,
someDTO
)
);
try {
result.get();
return someDTO.getId();
} catch (ExecutionException e) {
throw new NotProducedException("Failed to produce : " + e.getMessage(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new NotProducedException("Interrupted while producing", e);
}
My question is not how to configure the get to wait for 25 seconds, but rather why do I have to wait for 60 seconds rather then the ones I have specified in my timeout to get a response?
Share Improve this question asked Mar 17 at 12:18 Gei TraykovGei Traykov 11 bronze badge 01 Answer
Reset to default 0The reason was the max.block.ms
config was not set and it defaults to 60000ms.
https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#max-block-ms