最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Springboot Kafka producer and InvalidPidMappingException - Stack Overflow

programmeradmin2浏览0评论

Using Spring boot 3.4.3 and Spring Kafka 3.3.3 if the producers are not used for more than a week I get the following errors

.springframework.kafka.core.KafkaProducerException: Failed to send
    at .springframework.kafka.core.KafkaTemplate.lambda$buildCallback$9(KafkaTemplate.java:891) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at .springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1111) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at .apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1568) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.ProducerBatchpleteFutureAndFireCallbacks(ProducerBatch.java:312) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1155) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:473) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:336) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at .apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ~[kafka-clients-7.8.0-ce.jar!/:na]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
    at .apache.kafkamon.utils.KafkaThread.run(KafkaThread.java:66) ~[kafka-clients-7.8.0-ce.jar!/:na]
Caused by: .apache.kafkamon.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.

Template code:

@Configuration
@Slf4j    
public class KafkaMigrationsProducerConfig {
    final KafkaProperties kafkaProperties;
    private final MeterRegistry meterRegistry;

    public KafkaMigrationsProducerConfig(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
        this.kafkaProperties = kafkaProperties;
        this.meterRegistry = meterRegistry;
    }

//.8.11/reference/html/#transactions
//    Starting with version 2.5.8, you can now configure the maxAge property on the producer factory. This is useful when using transactional
//    producers that might lay idle for the broker’s transactional.id.expiration.ms. With current kafka-clients, this can cause a ProducerFencedException
//    without a re-balance. By setting the maxAge to less than transactional.id.expiration.ms, the factory will refresh the producer if it is past it’s max age.
    @Bean
    public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
        return producerFactory -> producerFactory.setMaxAge(Duration.ofDays(1));
    }

    @Bean
    public ProducerFactory<String, Object> migrationsProducerFactory() {
        Map<String, Object> configProps = KafkaPropertiesHelper.buildRequiredPropsOnly(kafkaProperties.buildProducerProperties(null));
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "migr-core-tx- " + UUID.randomUUID().toString());
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(KafkaProducerMigrationsInterceptor.class));
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        //retry
        configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        //timeouts
        configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "300000");//5 minutes
        configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");//5 minutes
        ProducerFactory<String, Object> result = new DefaultKafkaProducerFactory<>(configProps);
        result.addListener(new MicrometerProducerListener<>(meterRegistry));

        return result;
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaMigrationsTransactionManager(final ProducerFactory<String, Object> migrationsFactoryTransactional) {
        return new KafkaTransactionManager<>(migrationsFactoryTransactional);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaMigrationsTransactionalTemplate(final ProducerFactory<String, Object> migrationsFactoryTransactional) {
        return new KafkaTemplate<>(migrationsFactoryTransactional);
    }
} 

and this is the code for Kafka producer

@Component
@Slf4j
    public class MigrationKafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplateTransactional;

    @Transactional
    public void send(String topic, List<DTO> payload) throws IAMigrationsException {
        CountDownLatch latch = new CountDownLatch(payload.size());

        payload.forEach(item -> {
            CompletableFuture<SendResult<String, Object>> future = kafkaTemplateTransactional.send(topic, item);
            future.whenComplete((result, ex) -> {
                if (ex == null) {
                    assert result != null;
                    log.info("sent the message correctly with offset=[{}]", result.getRecordMetadata().offset());
                } else {
                    log.error("producer could not send the message {}", ex.getMessage(), ex);
                }
                latch.countDown();
            });
        });

        try {
            if (latch.await(5, TimeUnit.MINUTES)) {
                log.info("{} events (including start and stop) sent ok", payload.size());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Timeout - data could not be sent to Kafka {}", e.getMessage());                
        }
    }

    public void close() {
        kafkaTemplateTransactional.getProducerFactory().reset();
    }
} 

ProducerInterceptor

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, DTO> onSend(ProducerRecord<String, DTO> record) {
       return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
       if (Objects.nonNull(exception)) {
           log.error("Error on acknowledgement " + exception.getMessage() + " topic: " + metadata.topic() + " offset: " + metadata.offset());
       } else {
           log.info("Server acknowledge event on topic: " + metadata.topic() + " offset: " + metadata.offset());
       }
    }

    @Override
    public void close() {
        //do nothing at this moment
    }

}

ProducerConfig values:

acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [PLAINTEXT://localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = ia-ds-deploy-test-producer-2
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
confluent.lkc.id = null
confluent.proxy.protocol.client.address = null
confluent.proxy.protocol.client.mode = PROXY
confluent.proxy.protocol.client.port = null
confluent.proxy.protocol.client.version = NONE
connections.max.idle.ms = 540000
delivery.timeout.ms = 300000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = [class KafkaProducerMigrationsInterceptor]
key.serializer = class .apache.kafkamon.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 300000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.iat.validation.enabled = false
sasl.oauthbearer.jti.validation.enabled = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = migr-core-tx- 67ba5de5-1f21-478c-a223-7072e4ce6de11
value.serializer = class KafkaSerializer

I've read that the issue is related to transactional.id.expiration.ms setting from Kafka server properties. I'm attempting to reproduce the error locally so I've started a local Kafka broker and set the transactional.id.expiration.ms=5000 (5 seconds). how can I create a unit test to reproduce this error on my local Kafka broker?

Setting setMaxAge to 1 day in production did not solve my problem as indicated in other SO questions Facing .apache.kafkamon.errors.InvalidPidMappingException in spring kafka, How to correct handle InvalidPidMappingException or from here /@micaelaturrin/invalidpidmappingexception-issue-in-a-kafka-transactional-producer-ee1a6503bf31 does not seems to fix the issue.

发布评论

评论列表(0)

  1. 暂无评论