I am new to Kafka & trying to make a decent project for my resume. So in a multi-service (microservices running on docker) model in which I'm developing multiple Spring Boot applications, two of them utilizes Kafka for messaging between them. Despite configuring the serializers and deserializers in my application.properties files, I'm encountering deserialization errors when consuming messages. Below are the relevant configurations and code snippets:
Producer Service (Clynic-Service) Configuration:
# application.properties
spring.application.name=Clynic-Service
spring.kafka.producer.key-serializer=.apache.kafkamon.serialization.StringSerializer
spring.kafka.producer.value-serializer=.apache.kafkamon.serialization.ByteArraySerializer
server.port=4000
logging.level.root=info
Producer Code:
package com.beingadish.projects.clynicservice.kafka;
import com.beingadish.projects.clynicservice.Model.Patient;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.kafka.core.KafkaTemplate;
import .springframework.stereotype.Service;
import patient.events.PatientEvent;
@Service
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendEvent(Patient patient) {
PatientEvent patientEvent = PatientEvent.newBuilder()
.setPatientId(patient.getId().toString())
.setEmail(patient.getEmail())
.setName(patient.getName())
.setEventType("PATIENT_CREATED")
.build();
try {
kafkaTemplate.send("patient", patientEvent.toByteArray());
} catch (Exception e) {
log.error("Error sending patient event: {}", patientEvent);
}
}
}
Consumer Service (Analytics-Service) Configuration:
# application.properties
spring.application.name=Analytics-Service
spring.kafka.consumer.key-deserializer=.apache.kafkamon.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=.apache.kafkamon.serialization.ByteArrayDeserializer
Consumer Code:
package com.beingadish.projects.analyticsservice.kafka;
import com.google.protobuf.InvalidProtocolBufferException;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.kafka.annotation.KafkaListener;
import .springframework.stereotype.Service;
import patient.events.PatientEvent;
@Service
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "patient", groupId = "analytics-service")
public void consumeEvent(byte[] event) {
try {
PatientEvent patientEvent = PatientEvent.parseFrom(event);
log.info("Received Patient Event: [Patient Id={}, Patient Name={}, PatientEmail={}]",
patientEvent.getPatientId(),
patientEvent.getName(),
patientEvent.getEmail());
} catch (InvalidProtocolBufferException e) {
log.error("Error Deserializing Patient Event {}", e.getMessage());
}
}
}
Proto File (patient_event.proto)
syntax = "proto3";
package patient.events;
option java_multiple_files = true;
message PatientEvent {
string patientId = 1;
string name = 2;
string email = 3;
string event_type = 4;
}
Issue:
Despite the above configurations, I'm encountering deserialization errors when the consumer attempts to process messages. The error suggests that the consumer is unable to deserialize the incoming byte array into the expected PatientEvent
object. It is somehow treating the serialized [B
as String
. The exact error which I encounter is this:
Edit:(full Error Message)
2025-03-30T02:27:01.989Z ERROR 1 --- [Analytics-Service] [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=1, maxAttempts=0} exhausted for patient-0@2 2025-03-30T02:27:02.248459758Z 2025-03-30T02:27:02.248522112Z .springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 2025-03-30T02:27:02.248530622Z Endpoint handler details: 2025-03-30T02:27:02.248536552Z Method [public void com.beingadish.projects.analyticsservice.kafka.KafkaConsumer.consumeEvent(byte[])] 2025-03-30T02:27:02.248542051Z Bean [com.beingadish.projects.analyticsservice.kafka.KafkaConsumer@7dccceb5] 2025-03-30T02:27:02.248547644Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2982) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248553908Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2889) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248561987Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2853) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248568138Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2766) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248573964Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2604) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248580073Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2493) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248587012Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2144) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249553424Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249568787Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1458) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249575607Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1327) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249805210Z at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na] 2025-03-30T02:27:02.249811461Z at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] 2025-03-30T02:27:02.249815586Z Suppressed: .springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace 2025-03-30T02:27:02.249819110Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:499) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249823150Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:478) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249826834Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:421) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249830553Z at .springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249834153Z at .springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249838020Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2875) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249841699Z Caused by: .springframework.messaging.converter.MessageConversionException: Cannot handle message 2025-03-30T02:27:02.249845240Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:478) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249849891Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:421) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249853471Z at .springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249857184Z at .springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249860953Z at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2875) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249864628Z ... 10 common frames omitted 2025-03-30T02:27:02.249868138Z Caused by: .springframework.messaging.converter.MessageConversionException: Failed to convert message payload ' 2025-03-30T02:27:02.249871808Z $07da774f-44d7-4636-878e-5cd97afdea7bAadarsh Pandey Kakfa [email protected]"PATIENT_CREATED' to '[B' 2025-03-30T02:27:02.249875731Z at .springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:70) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249885757Z at .springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:147) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249889485Z at .springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249893325Z at .springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249897307Z at .springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249901043Z at .springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249904670Z at .springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:71) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249909778Z at .springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:474) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249913410Z ... 14 common frames omitted 2025-03-30T02:27:02.249916917Z Caused by: .springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [byte] for value [$07da774f-44d7-4636-878e-5cd97afdea7bAadarsh Pandey Kakfa [email protected]"PATIENT_CR (truncated)...] 2025-03-30T02:27:02.249921137Z at .springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:47) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249924757Z at .springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:182) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249928201Z at .springframework.core.convert.support.StringToArrayConverter.convert(StringToArrayConverter.java:72) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249931777Z at .springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249935234Z at .springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:182) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249938928Z at .springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:165) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249942428Z at .springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249946133Z ... 21 common frames omitted 2025-03-30T02:27:02.249949584Z Caused by: java.lang.NumberFormatException: For input string: "$07da774f-44d7-4636-878e-5cd97afdea7bAadarshPandeyKakfaTestkakfaTest@producer"PATIENT_CREATED" 2025-03-30T02:27:02.249958027Z at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67) ~[na:na] 2025-03-30T02:27:02.249961791Z at java.base/java.lang.Integer.parseInt(Integer.java:647) ~[na:na] 2025-03-30T02:27:02.249965305Z at java.base/java.lang.Byte.parseByte(Byte.java:193) ~[na:na] 2025-03-30T02:27:02.249968643Z at java.base/java.lang.Byte.valueOf(Byte.java:249) ~[na:na] 2025-03-30T02:27:02.249972017Z at java.base/java.lang.Byte.valueOf(Byte.java:275) ~[na:na] 2025-03-30T02:27:02.249976761Z at .springframework.util.NumberUtils.parseNumber(NumberUtils.java:195) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249980417Z at .springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:64) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249984010Z at .springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:50) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249987832Z at .springframework.core.convert.support.GenericConversionService$ConverterFactoryAdapter.convert(GenericConversionService.java:415) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249991483Z at .springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249995144Z ... 27 common frames omitted 2025-03-30T02:27:02.249998725Z