I have a spring cloud function that reads from kafka topic. Here's how I define the consumer as it's described here -> url
Consumer
@IdempotentKafkaConsumer
@Bean
fun readConnectionStatusEvent() : Consumer<Message<String>> {
return Consumer<Message<String>> { message ->
...bla
}
}
I used spring boot aspect oriented to have control before / after we consume message. Here is the annotation and it's processor
Annotation
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class IdempotentKafkaConsumer
Annotation Processor
@Aspect
@Component
class MessageDeduplicationAspect(
private val messageProcessor: MessageProcessor,
) {
@Around("@annotation(IdempotentKafkaConsumer)")
fun processMessage(joinPoint: ProceedingJoinPoint): Any? {
try {
val message = extractMessage(joinPoint.args)
val messageId = CloudEventUtil.getIdOrNull(message)
val source = CloudEventUtil.getSourceOrNull(message)?.toString()
val channelName = message.headers[KafkaHeaders.RECEIVED_TOPIC]?.toString()
// if any of the header values missing, just run the handler, skip idempotency check
if (messageId == null || source == null || channelName == null) {
return joinPoint.proceed()
}
// Pre-processing
preProcess(messageId, source, channelName)
// Execute the actual message handling
val result = joinPoint.proceed()
// Post-processing
postProcess(messageId, source, channelName)
return result
} catch (e: Exception) {
return null
}
}
My problem is when I run integration tests, definition below, I get an error
@ActiveProfiles("integration-test")
@Import(FakeClock::class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
class ApplicationTestsIT
Error
2025-03-19T19:37:55.467+01:00 WARN 9312 --- [ main] onfigReactiveWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: .springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [/springframework/cloud/stream/function/FunctionConfiguration.class]: Discovered 3 methods that would qualify as 'functional' - [public boolean .springframework.beans.factory.support.NullBean.equals(java.lang.Object), public java.lang.String .springframework.beans.factory.support.NullBean.toString(), public int .springframework.beans.factory.support.NullBean.hashCode()].
Class 'class .springframework.beans.factory.support.NullBean' is not a FunctionalInterface.
How can I solve the problem? I don't want to extract consumer's logic into a different function and call it from the bean. Thanks in advance