I have figured out how to implement observability with spring boot, micrometer and open telemetry. It al works fine only we have some services build with spring integration for pulling a message from jms and putting it one kafka. We can enable observability for jms by adding the setObservationRegistry to the jmsMessageListener. For kafka you can enable it by setObservationEnabled to true on the ConcurrentKafkaListenerContainerFactory and on the kafkaTemplate the only problem is that those two beans are not used by kafka integration. Is there information on the internet how to enable observability for kafka with spring integration?
I have search the spring documentation, stack overflow and other sources but I can't find any implementation about observability and spring integration with the combination of kafka.
I have set the kafkaTemplate observability and the concurrentKafkaListenerContainerFactory with observability but it does not being used by spring integration. I use spring boot 3.4.3 and spring integration 6.4.2
updated with code: the code:
@Bean
public IntegrationFlow routingFlowConfig(ConsumerFactory<?, ?> kafkaConsumerFactory,
ConsumerProperties consumerProperties,
JsonSchemaValidator jsonSchemaValidator,
KafkaRoutingMessageHeaderProvider kafkaMessageHeaderProvider,
ProducerFactory<?, ?> kafkaProducerFactory) {
return IntegrationFlow
.from(Kafka.inboundChannelAdapter(kafkaConsumerFactory, consumerProperties))
.log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "got a message")
.log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "sent a message")
.handle(Kafka.outboundChannelAdapter(kafkaProducerFactory))
.get();
}
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsMessageListenerContainer))
.log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "got a message")
.enrichHeaders(h -> h.headerFunction(EVENT_TYPE, eventTypeHeaderProvider::determineEventType))
.enrichHeaders(h -> h.headerFunction(KafkaHeaders.KEY, kafkaStorageMessageHeaderProvider::determineKey))
.enrichHeaders(h -> h.headerFunction(TOPIC, kafkaStorageMessageHeaderProvider::determineTopic))
.log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "sent a message")
.handle(Kafka.outboundChannelAdapter(kafkaTemplate))
.get();
When I change the Kafka.inboundChannelAdapter to Kafka.messageDrivenChannelAdapter the JMS.messageDrivenChannelAdapter broked.
The kafkaTemplate works :).
I expect spanId and traceId in the mdc logging.