最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Getting Error while deserializing message from kafka using Avro Schema - Stack Overflow

programmeradmin2浏览0评论

I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.

Code

import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;

public class AvroUtility {
    
  private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);

  public SpecificDatumReader<GenericRecord> datumReader() {
    String valueSchemaString = null;
    valueSchemaString = "my schema in form of json string"
    Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
    return datumReader;
  }
}

We used the above AvroUtility class in below code

ConsumerRecord<String, byte[]> record

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

We are getting below error

.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
        at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at com.myapp.controller.Replicator.replicator(Replicator.java:104)
        at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
        at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)

I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.

Code

import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;

public class AvroUtility {
    
  private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);

  public SpecificDatumReader<GenericRecord> datumReader() {
    String valueSchemaString = null;
    valueSchemaString = "my schema in form of json string"
    Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
    return datumReader;
  }
}

We used the above AvroUtility class in below code

ConsumerRecord<String, byte[]> record

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

We are getting below error

.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
        at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at com.myapp.controller.Replicator.replicator(Replicator.java:104)
        at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
        at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)
Share Improve this question edited Apr 1 at 4:05 Akash Rai asked Mar 30 at 17:34 Akash RaiAkash Rai 335 bronze badges 1
  • Went through producer code and found that kafka producer has been configured with ("value.serializer", KafkaAvroSerializer.class.getName()); and our consumer is using ByteArrayDeserializer Could be issue right? If we use KafkaAvroDeserializer.class.getName(), can i expect it to be in json? – Akash Rai Commented Mar 31 at 2:23
Add a comment  | 

1 Answer 1

Reset to default 1

Here you are fetting one thing:

ConsumerRecord<String, byte[]> record;

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

The binary payload that is passing into binaryDecoder includes extra bytes :

[Magic][4-byte_schemaID][Avro bytes]

The datumReader doesn't expect this, thou causing the failure. This would work if you were using raw Avro, but as you said you serialize with the KafkaAvroSerializer

So, in order to read properly the payload, you could:

byte[] kafkaPayload = record.value();

int schemaRegistryHeaderLength = 5;  //header bytes
byte[] avroData = Arrays.copyOfRange(kafkaPayload, schemaRegistryHeaderLength, 
kafkaPayload.length);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

If using Confluent, you could also use the AvroDeserializer:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;

KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
Properties props = new Properties();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "yourRegistry:port");

deserializer.configure(props, false);

GenericRecord deserializedRecord = (GenericRecord) deserializer.deserialize("topic", record.value());
发布评论

评论列表(0)

  1. 暂无评论