I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.
Code
import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;
public class AvroUtility {
private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);
public SpecificDatumReader<GenericRecord> datumReader() {
String valueSchemaString = null;
valueSchemaString = "my schema in form of json string"
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
return datumReader;
}
}
We used the above AvroUtility class in below code
ConsumerRecord<String, byte[]> record
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
We are getting below error
.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.myapp.controller.Replicator.replicator(Replicator.java:104)
at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)
I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.
Code
import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;
public class AvroUtility {
private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);
public SpecificDatumReader<GenericRecord> datumReader() {
String valueSchemaString = null;
valueSchemaString = "my schema in form of json string"
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
return datumReader;
}
}
We used the above AvroUtility class in below code
ConsumerRecord<String, byte[]> record
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
We are getting below error
.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.myapp.controller.Replicator.replicator(Replicator.java:104)
at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)
Share
Improve this question
edited Apr 1 at 4:05
Akash Rai
asked Mar 30 at 17:34
Akash RaiAkash Rai
335 bronze badges
1
- Went through producer code and found that kafka producer has been configured with ("value.serializer", KafkaAvroSerializer.class.getName()); and our consumer is using ByteArrayDeserializer Could be issue right? If we use KafkaAvroDeserializer.class.getName(), can i expect it to be in json? – Akash Rai Commented Mar 31 at 2:23
1 Answer
Reset to default 1Here you are fetting one thing:
ConsumerRecord<String, byte[]> record;
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
The binary payload that is passing into binaryDecoder
includes extra bytes :
[Magic][4-byte_schemaID][Avro bytes]
The datumReader
doesn't expect this, thou causing the failure. This would work if you were using raw Avro, but as you said you serialize with the KafkaAvroSerializer
So, in order to read properly the payload, you could:
byte[] kafkaPayload = record.value();
int schemaRegistryHeaderLength = 5; //header bytes
byte[] avroData = Arrays.copyOfRange(kafkaPayload, schemaRegistryHeaderLength,
kafkaPayload.length);
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
If using Confluent, you could also use the AvroDeserializer
:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
Properties props = new Properties();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "yourRegistry:port");
deserializer.configure(props, false);
GenericRecord deserializedRecord = (GenericRecord) deserializer.deserialize("topic", record.value());