I’m working on a Quarkus application that uses the Quarkus Kafka Schema Registry Avro guide as a starting point. I have an Event Hub on Azure and a corresponding Schema Registry that already contains my movie.avsc (which is identical to my local Avro schema).
I’ve set up my configuration like this:
mp.messaging.outgoing.movies-out.topic=movies
mp.messaging.outgoing.movies-out.connector=smallrye-kafka
mp.messaging.outgoing.movies-out.value.serializer=ch.rfag.MovieSerializer
mp.messaging.outgoing.movies-out.azure.schemaregistry.url=
mp.messaging.outgoing.movies-out.azure.schemaregistry.schema-group=movies_schema
mp.messaging.outgoing.movies-out.azure.schemaregistry.connection-string=Endpoint=sb://regoheart-eventhub-sn.servicebus.windows/;SharedAccessKeyName=YourKeyName;SharedAccessKey=YourKeyValue
# Incoming configuration (for receiving)
mp.messaging.incoming.movies-in.topic=movies
mp.messaging.incoming.movies-in.connector=smallrye-kafka
mp.messaging.incoming.movies-in.value.deserializer=ch.rfag.MovieDeserializer
mp.messaging.incoming.movies-in.azure.schemaregistry.url=
mp.messaging.incoming.movies-in.azure.schemaregistry.schema-group=movies_schema
mp.messaging.incoming.movies-in.azure.schemaregistry.connection-string=Endpoint=sb://regoheart-eventhub-sn.servicebus.windows/;SharedAccessKeyName=YourKeyName;SharedAccessKey=YourKeyValue
# Enable specific record deserialization
mp.messaging.incoming.movies-in.specific.avro.reader=true
kafka.bootstrap.servers=${EVENTHUB_NAMESPACE}.servicebus.windows:9093
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=.apache.kafkamon.security.plain.PlainLoginModule required \\
username="$ConnectionString" \\
password="${EVENTHUB_CONNECTION_STRING}";
%dev.kafka.security.protocol=PLAINTEXT
%dev.kafka.sasl.mechanism=
%dev.kafka.sasl.jaas.config=
%dev.kafka.bootstrap.servers=localhost:9092
%test.kafka.security.protocol=PLAINTEXT
%test.kafka.sasl.mechanism=
%test.kafka.sasl.jaas.config=
%test.kafka.bootstrap.servers=localhost:9092
# ========================
# HTTP Configuration
# ========================
quarkus.http.host=0.0.0.0
quarkus.http.port=8079
build.gradle
plugins {
id 'java'
id 'io.quarkus'
}
repositories {
mavenCentral()
mavenLocal()
}
dependencies {
implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")
implementation 'io.quarkus:quarkus-resteasy-jsonb'
implementation 'com.azure:azure-data-schemaregistry-apacheavro:1.1.24'
implementation 'io.quarkus:quarkus-messaging-kafka'
implementation 'io.quarkus:quarkus-avro'
implementation 'io.quarkus:quarkus-arc'
implementation 'io.quarkus:quarkus-resteasy'
testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.rest-assured:rest-assured'
}
group 'ch.rfag'
version '1.0-SNAPSHOT'
java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}
test {
systemProperty "java.util.logging.manager", ".jboss.logmanager.LogManager"
}
compileJava {
options.encoding = 'UTF-8'
optionspilerArgs << '-parameters'
}
compileTestJava {
options.encoding = 'UTF-8'
}
My code (including my generated Movie class, consumer, and resource) is working fine with my local Avro schema. Here’s a snippet of my custom serializer/deserializer:
package ch.rfag;
import java.nio.ByteBuffer;
import java.util.Map;
import .apache.kafkamon.serialization.Serializer;
import me.escoffier.quarkus.Movie;
public class MovieSerializer implements Serializer<Movie> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No configuration is needed for this serializer.
}
@Override
public byte[] serialize(String topic, Movie movie) {
if (movie == null) {
return null;
}
try {
// Convert the Movie to a ByteBuffer using the generated method.
ByteBuffer buffer = movie.toByteBuffer();
// Create a byte array with the exact remaining size.
byte[] bytes = new byte[buffer.remaining()];
// Copy the contents of the ByteBuffer into the byte array.
buffer.get(bytes);
return bytes;
} catch (Exception e) {
throw new RuntimeException("Error serializing Movie", e);
}
}
@Override
public void close() {
// No resources to close.
}
}
The problem is that on Azure I have a Schema Registry containing the movie.avsc, but my current serializer and deserializer always use the locally generated Avro schema (via toByteBuffer() and fromByteBuffer()).
My Question: How can I configure my application so that it uses the movie.avsc from the Azure Schema Registry instead of the local copy? In other words, I need to integrate the Azure Schema Registry client (or its Avro serializer/deserializer) so that my messages are serialized/deserialized with the schema as stored in Azure.