I would like to consume messages from kafka using reactor kafka and SSL bundle.
Previously, I had a consumer which looked like:
(this is working fine, I can consume messages from a secure SSL Kafka)
@Bean
public KafkaReceiver<String, String> kafkaReceiver(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
final Map<String, Object> properties = new HashMap<>();
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore.jks");
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keyStorePassphrase");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "trustStorePassphrase");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "testtestfour");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testtesttfour");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
return KafkaReceiver.create(receiverOptions
.addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
.addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
.consumerListener(new MicrometerConsumerListener(meterRegistry))
.withObservation(observationRegistry)
.subscription(Collections.singleton("topic")));
}
Now, I am hopin I could use SSL Bundle:
@Bean
public KafkaReceiver<String, String> kafkaReceiver(MeterRegistry meterRegistry, ObservationRegistry observationRegistry, SslBundles sslBundles) {
final Map<String, Object> properties = new HashMap<>();
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, SslBundleSslEngineFactory.class.getName());
properties.put(SslBundle.class.getName(), sslBundles.getBundle("kafka_bundle"));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
return KafkaReceiver.create(receiverOptions
.addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
.addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
.consumerListener(new MicrometerConsumerListener(meterRegistry))
.withObservation(observationRegistry)
.subscription(Collections.singleton("topicSource")));
}
spring.ssl.bundle.pem.kafka-bundle.keystore.private-key=[...]
spring.ssl.bundle.pem.kafka-bundle.keystore.certificate=[...]
spring.ssl.bundle.pem.kafka-bundle.truststore.certificate=[...]
The values of the three properties are in PEM format:
example
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIUNM5QQv8IzVQsgSmmdPQNaqyzWs4wDQYJKoZIhvcNAQEL
BQAwezELMAkGA1UEBhMCWFgxEjAQBgNVBAgMCVN0YXRlTmFtZTERMA8GA1UEBwwI
...
V0IJjcmYjEZbTvpjFKznvaFiOUv+8L7jHQ1/Yf+9c3C8gSjdUfv88m17pqYXd+Ds
HEmfmNNjht130UyjNCITmLVXyy5p35vWmdf95U3uEbJSnNVtXH8qRmN9oK9mUpDb
ngX6JBJI7fw7tXoqWSLHNiBODM88fUlQSho8
-----END CERTIFICATE-----
Unfortunately, the SSL Bundle consumer is always failing with:
reactor.core.Exceptions$ErrorCallbackNotImplemented: .apache.kafkamon.config.ConfigException: Invalid value .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
Caused by: .apache.kafkamon.config.ConfigException: Invalid value .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
at .apache.kafkamon.config.ConfigDef.parseType(ConfigDef.java:778)
at .apache.kafkamon.config.ConfigDef.parseValue(ConfigDef.java:531)
at .apache.kafkamon.config.ConfigDef.parse(ConfigDef.java:524)
at .apache.kafkamon.config.AbstractConfig.<init>(AbstractConfig.java:114)
at .apache.kafkamon.config.AbstractConfig.<init>(AbstractConfig.java:134)
at .apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:728)
at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595)
at reactor.kafka.receiver.internals.ConsumerFactory.createConsumer(ConsumerFactory.java:34)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$24(DefaultKafkaReceiver.java:180)
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:9012)
at reactor.core.publisher.Flux.subscribe(Flux.java:8856)
at reactor.core.publisher.Flux.subscribe(Flux.java:8780)
at reactor.core.publisher.Flux.subscribe(Flux.java:8698)
at com.MyService.run(MyService.java:53)
May I ask how to properly configure reactor kafka consumer to leverage ssl bundle?