最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Invalid value .kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .SslBundleSslEngineFacto

programmeradmin5浏览0评论

I would like to consume messages from kafka using reactor kafka and SSL bundle.

Previously, I had a consumer which looked like:

(this is working fine, I can consume messages from a secure SSL Kafka)

  @Bean
    public KafkaReceiver<String, String> kafkaReceiver(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
        final Map<String, Object> properties = new HashMap<>();
        properties.put("security.protocol", "SSL");
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore.jks");
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keyStorePassphrase");
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "trustStorePassphrase");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "testtestfour");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testtesttfour");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
        return KafkaReceiver.create(receiverOptions
                .addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
                .addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
                .consumerListener(new MicrometerConsumerListener(meterRegistry))
                .withObservation(observationRegistry)
                .subscription(Collections.singleton("topic")));
    }

Now, I am hopin I could use SSL Bundle:

 @Bean
    public KafkaReceiver<String, String> kafkaReceiver(MeterRegistry meterRegistry, ObservationRegistry observationRegistry, SslBundles sslBundles) {
        final Map<String, Object> properties = new HashMap<>();
        properties.put("security.protocol", "SSL");
        properties.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, SslBundleSslEngineFactory.class.getName());
        properties.put(SslBundle.class.getName(), sslBundles.getBundle("kafka_bundle"));
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerGroup);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
        return KafkaReceiver.create(receiverOptions
                .addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
                .addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
                .consumerListener(new MicrometerConsumerListener(meterRegistry))
                .withObservation(observationRegistry)
                .subscription(Collections.singleton("topicSource")));
    }
spring.ssl.bundle.pem.kafka-bundle.keystore.private-key=[...]
spring.ssl.bundle.pem.kafka-bundle.keystore.certificate=[...]
spring.ssl.bundle.pem.kafka-bundle.truststore.certificate=[...]

The values of the three properties are in PEM format:

example

-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIUNM5QQv8IzVQsgSmmdPQNaqyzWs4wDQYJKoZIhvcNAQEL
BQAwezELMAkGA1UEBhMCWFgxEjAQBgNVBAgMCVN0YXRlTmFtZTERMA8GA1UEBwwI
...
V0IJjcmYjEZbTvpjFKznvaFiOUv+8L7jHQ1/Yf+9c3C8gSjdUfv88m17pqYXd+Ds
HEmfmNNjht130UyjNCITmLVXyy5p35vWmdf95U3uEbJSnNVtXH8qRmN9oK9mUpDb
ngX6JBJI7fw7tXoqWSLHNiBODM88fUlQSho8
-----END CERTIFICATE-----

Unfortunately, the SSL Bundle consumer is always failing with:

reactor.core.Exceptions$ErrorCallbackNotImplemented: .apache.kafkamon.config.ConfigException: Invalid value .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
Caused by: .apache.kafkamon.config.ConfigException: Invalid value .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
    at .apache.kafkamon.config.ConfigDef.parseType(ConfigDef.java:778)
    at .apache.kafkamon.config.ConfigDef.parseValue(ConfigDef.java:531)
    at .apache.kafkamon.config.ConfigDef.parse(ConfigDef.java:524)
    at .apache.kafkamon.config.AbstractConfig.<init>(AbstractConfig.java:114)
    at .apache.kafkamon.config.AbstractConfig.<init>(AbstractConfig.java:134)
    at .apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:728)
    at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
    at reactor.kafka.receiver.internals.ConsumerFactory.createConsumer(ConsumerFactory.java:34)
    at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$24(DefaultKafkaReceiver.java:180)
    at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
    at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:9012)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8856)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8780)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8698)
    at com.MyService.run(MyService.java:53)

May I ask how to properly configure reactor kafka consumer to leverage ssl bundle?

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论