Using Spring boot 3.4.3 and Spring Kafka 3.3.3 if the producers are not used for more than a week I get the following errors
.springframework.kafka.core.KafkaProducerException: Failed to send
at .springframework.kafka.core.KafkaTemplate.lambda$buildCallback$9(KafkaTemplate.java:891) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at .springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1111) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at .apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1568) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.ProducerBatchpleteFutureAndFireCallbacks(ProducerBatch.java:312) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1155) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:473) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:336) ~[kafka-clients-7.8.0-ce.jar!/:na]
at .apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ~[kafka-clients-7.8.0-ce.jar!/:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
at .apache.kafkamon.utils.KafkaThread.run(KafkaThread.java:66) ~[kafka-clients-7.8.0-ce.jar!/:na]
Caused by: .apache.kafkamon.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Template code:
@Configuration
@Slf4j
public class KafkaMigrationsProducerConfig {
final KafkaProperties kafkaProperties;
private final MeterRegistry meterRegistry;
public KafkaMigrationsProducerConfig(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
this.kafkaProperties = kafkaProperties;
this.meterRegistry = meterRegistry;
}
//.8.11/reference/html/#transactions
// Starting with version 2.5.8, you can now configure the maxAge property on the producer factory. This is useful when using transactional
// producers that might lay idle for the broker’s transactional.id.expiration.ms. With current kafka-clients, this can cause a ProducerFencedException
// without a re-balance. By setting the maxAge to less than transactional.id.expiration.ms, the factory will refresh the producer if it is past it’s max age.
@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
return producerFactory -> producerFactory.setMaxAge(Duration.ofDays(1));
}
@Bean
public ProducerFactory<String, Object> migrationsProducerFactory() {
Map<String, Object> configProps = KafkaPropertiesHelper.buildRequiredPropsOnly(kafkaProperties.buildProducerProperties(null));
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "migr-core-tx- " + UUID.randomUUID().toString());
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(KafkaProducerMigrationsInterceptor.class));
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
//retry
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
//timeouts
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "300000");//5 minutes
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");//5 minutes
ProducerFactory<String, Object> result = new DefaultKafkaProducerFactory<>(configProps);
result.addListener(new MicrometerProducerListener<>(meterRegistry));
return result;
}
@Bean
public KafkaTransactionManager<String, Object> kafkaMigrationsTransactionManager(final ProducerFactory<String, Object> migrationsFactoryTransactional) {
return new KafkaTransactionManager<>(migrationsFactoryTransactional);
}
@Bean
public KafkaTemplate<String, Object> kafkaMigrationsTransactionalTemplate(final ProducerFactory<String, Object> migrationsFactoryTransactional) {
return new KafkaTemplate<>(migrationsFactoryTransactional);
}
}
and this is the code for Kafka producer
@Component
@Slf4j
public class MigrationKafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplateTransactional;
@Transactional
public void send(String topic, List<DTO> payload) throws IAMigrationsException {
CountDownLatch latch = new CountDownLatch(payload.size());
payload.forEach(item -> {
CompletableFuture<SendResult<String, Object>> future = kafkaTemplateTransactional.send(topic, item);
future.whenComplete((result, ex) -> {
if (ex == null) {
assert result != null;
log.info("sent the message correctly with offset=[{}]", result.getRecordMetadata().offset());
} else {
log.error("producer could not send the message {}", ex.getMessage(), ex);
}
latch.countDown();
});
});
try {
if (latch.await(5, TimeUnit.MINUTES)) {
log.info("{} events (including start and stop) sent ok", payload.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Timeout - data could not be sent to Kafka {}", e.getMessage());
}
}
public void close() {
kafkaTemplateTransactional.getProducerFactory().reset();
}
}
ProducerInterceptor
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, DTO> onSend(ProducerRecord<String, DTO> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (Objects.nonNull(exception)) {
log.error("Error on acknowledgement " + exception.getMessage() + " topic: " + metadata.topic() + " offset: " + metadata.offset());
} else {
log.info("Server acknowledge event on topic: " + metadata.topic() + " offset: " + metadata.offset());
}
}
@Override
public void close() {
//do nothing at this moment
}
}
ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [PLAINTEXT://localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = ia-ds-deploy-test-producer-2
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
confluent.lkc.id = null
confluent.proxy.protocol.client.address = null
confluent.proxy.protocol.client.mode = PROXY
confluent.proxy.protocol.client.port = null
confluent.proxy.protocol.client.version = NONE
connections.max.idle.ms = 540000
delivery.timeout.ms = 300000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = [class KafkaProducerMigrationsInterceptor]
key.serializer = class .apache.kafkamon.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 300000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.iat.validation.enabled = false
sasl.oauthbearer.jti.validation.enabled = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = migr-core-tx- 67ba5de5-1f21-478c-a223-7072e4ce6de11
value.serializer = class KafkaSerializer
I've read that the issue is related to transactional.id.expiration.ms setting from Kafka server properties. I'm attempting to reproduce the error locally so I've started a local Kafka broker and set the transactional.id.expiration.ms=5000 (5 seconds). how can I create a unit test to reproduce this error on my local Kafka broker?
Setting setMaxAge to 1 day in production did not solve my problem as indicated in other SO questions Facing .apache.kafkamon.errors.InvalidPidMappingException in spring kafka, How to correct handle InvalidPidMappingException or from here /@micaelaturrin/invalidpidmappingexception-issue-in-a-kafka-transactional-producer-ee1a6503bf31 does not seems to fix the issue.