最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

kotlin - How to control spring cloud function with aspect oriented annotations? - Stack Overflow

programmeradmin5浏览0评论

I have a spring cloud function that reads from kafka topic. Here's how I define the consumer as it's described here -> url

Consumer

@IdempotentKafkaConsumer
@Bean
fun readConnectionStatusEvent() : Consumer<Message<String>> {
    return Consumer<Message<String>> { message ->
        ...bla
    }
}

I used spring boot aspect oriented to have control before / after we consume message. Here is the annotation and it's processor

Annotation

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class IdempotentKafkaConsumer

Annotation Processor

@Aspect
@Component
class MessageDeduplicationAspect(
    private val messageProcessor: MessageProcessor,
) {
    @Around("@annotation(IdempotentKafkaConsumer)")
    fun processMessage(joinPoint: ProceedingJoinPoint): Any? {
        try {
            val message = extractMessage(joinPoint.args)

            val messageId = CloudEventUtil.getIdOrNull(message)
            val source = CloudEventUtil.getSourceOrNull(message)?.toString()
            val channelName = message.headers[KafkaHeaders.RECEIVED_TOPIC]?.toString()

            // if any of the header values missing, just run the handler, skip idempotency check
            if (messageId == null || source == null || channelName == null) {
                return joinPoint.proceed()
            }

            // Pre-processing
            preProcess(messageId, source, channelName)

            // Execute the actual message handling
            val result = joinPoint.proceed()

            // Post-processing
            postProcess(messageId, source, channelName)

            return result
        } catch (e: Exception) {
            return null
        }
    }

My problem is when I run integration tests, definition below, I get an error

@ActiveProfiles("integration-test")
@Import(FakeClock::class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
class ApplicationTestsIT

Error

2025-03-19T19:37:55.467+01:00  WARN 9312 --- [           main] onfigReactiveWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: .springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [/springframework/cloud/stream/function/FunctionConfiguration.class]: Discovered 3 methods that would qualify as 'functional' - [public boolean .springframework.beans.factory.support.NullBean.equals(java.lang.Object), public java.lang.String .springframework.beans.factory.support.NullBean.toString(), public int .springframework.beans.factory.support.NullBean.hashCode()].
 Class 'class .springframework.beans.factory.support.NullBean' is not a FunctionalInterface.

How can I solve the problem? I don't want to extract consumer's logic into a different function and call it from the bean. Thanks in advance

发布评论

评论列表(0)

  1. 暂无评论