最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

How can I get spanId and traceId from a message received from a kafka topic with spring integration - Stack Overflow

programmeradmin1浏览0评论

I have figured out how to implement observability with spring boot, micrometer and open telemetry. It al works fine only we have some services build with spring integration for pulling a message from jms and putting it one kafka. We can enable observability for jms by adding the setObservationRegistry to the jmsMessageListener. For kafka you can enable it by setObservationEnabled to true on the ConcurrentKafkaListenerContainerFactory and on the kafkaTemplate the only problem is that those two beans are not used by kafka integration. Is there information on the internet how to enable observability for kafka with spring integration?

I have search the spring documentation, stack overflow and other sources but I can't find any implementation about observability and spring integration with the combination of kafka.

I have set the kafkaTemplate observability and the concurrentKafkaListenerContainerFactory with observability but it does not being used by spring integration. I use spring boot 3.4.3 and spring integration 6.4.2

updated with code: the code:

@Bean
public IntegrationFlow routingFlowConfig(ConsumerFactory<?, ?> kafkaConsumerFactory,
                                         ConsumerProperties consumerProperties,
                                         JsonSchemaValidator jsonSchemaValidator,
                                         KafkaRoutingMessageHeaderProvider kafkaMessageHeaderProvider,
                                         ProducerFactory<?, ?> kafkaProducerFactory) {
    return IntegrationFlow
            .from(Kafka.inboundChannelAdapter(kafkaConsumerFactory, consumerProperties))
            .log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "got a message")
            .log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "sent a message")
            .handle(Kafka.outboundChannelAdapter(kafkaProducerFactory))
            .get();
}


return IntegrationFlow
        .from(Jms.messageDrivenChannelAdapter(jmsMessageListenerContainer))
        .log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "got a message")
        .enrichHeaders(h -> h.headerFunction(EVENT_TYPE, eventTypeHeaderProvider::determineEventType))
        .enrichHeaders(h -> h.headerFunction(KafkaHeaders.KEY, kafkaStorageMessageHeaderProvider::determineKey))
        .enrichHeaders(h -> h.headerFunction(TOPIC, kafkaStorageMessageHeaderProvider::determineTopic))
        .log(LoggingHandler.Level.INFO, LOGGING_EVENTS, m -> "sent a message")
        .handle(Kafka.outboundChannelAdapter(kafkaTemplate))
        .get();

When I change the Kafka.inboundChannelAdapter to Kafka.messageDrivenChannelAdapter the JMS.messageDrivenChannelAdapter broked.

The kafkaTemplate works :).

I expect spanId and traceId in the mdc logging.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论