最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Using Azure Schema Registry with Quarkus & Kafka Schema Registry Avro – How to Use the Remote Avro Schema Instead of Loc

programmeradmin0浏览0评论

I’m working on a Quarkus application that uses the Quarkus Kafka Schema Registry Avro guide as a starting point. I have an Event Hub on Azure and a corresponding Schema Registry that already contains my movie.avsc (which is identical to my local Avro schema).

I’ve set up my configuration like this:

mp.messaging.outgoing.movies-out.topic=movies
mp.messaging.outgoing.movies-out.connector=smallrye-kafka
mp.messaging.outgoing.movies-out.value.serializer=ch.rfag.MovieSerializer
mp.messaging.outgoing.movies-out.azure.schemaregistry.url=
mp.messaging.outgoing.movies-out.azure.schemaregistry.schema-group=movies_schema
mp.messaging.outgoing.movies-out.azure.schemaregistry.connection-string=Endpoint=sb://regoheart-eventhub-sn.servicebus.windows/;SharedAccessKeyName=YourKeyName;SharedAccessKey=YourKeyValue

# Incoming configuration (for receiving)
mp.messaging.incoming.movies-in.topic=movies
mp.messaging.incoming.movies-in.connector=smallrye-kafka
mp.messaging.incoming.movies-in.value.deserializer=ch.rfag.MovieDeserializer
mp.messaging.incoming.movies-in.azure.schemaregistry.url=
mp.messaging.incoming.movies-in.azure.schemaregistry.schema-group=movies_schema
mp.messaging.incoming.movies-in.azure.schemaregistry.connection-string=Endpoint=sb://regoheart-eventhub-sn.servicebus.windows/;SharedAccessKeyName=YourKeyName;SharedAccessKey=YourKeyValue

# Enable specific record deserialization
mp.messaging.incoming.movies-in.specific.avro.reader=true

kafka.bootstrap.servers=${EVENTHUB_NAMESPACE}.servicebus.windows:9093
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=.apache.kafkamon.security.plain.PlainLoginModule required \\ 
    username="$ConnectionString" \\ 
    password="${EVENTHUB_CONNECTION_STRING}";

%dev.kafka.security.protocol=PLAINTEXT
%dev.kafka.sasl.mechanism=
%dev.kafka.sasl.jaas.config=
%dev.kafka.bootstrap.servers=localhost:9092

%test.kafka.security.protocol=PLAINTEXT
%test.kafka.sasl.mechanism=
%test.kafka.sasl.jaas.config=
%test.kafka.bootstrap.servers=localhost:9092

# ========================
# HTTP Configuration
# ========================
quarkus.http.host=0.0.0.0
quarkus.http.port=8079

build.gradle

plugins {
    id 'java'
    id 'io.quarkus'
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")
    implementation 'io.quarkus:quarkus-resteasy-jsonb'
    implementation 'com.azure:azure-data-schemaregistry-apacheavro:1.1.24'
    implementation 'io.quarkus:quarkus-messaging-kafka'
    implementation 'io.quarkus:quarkus-avro'
    implementation 'io.quarkus:quarkus-arc'
    implementation 'io.quarkus:quarkus-resteasy'
    testImplementation 'io.quarkus:quarkus-junit5'
    testImplementation 'io.rest-assured:rest-assured'
}

group 'ch.rfag'
version '1.0-SNAPSHOT'

java {
    sourceCompatibility = JavaVersion.VERSION_21
    targetCompatibility = JavaVersion.VERSION_21
}

test {
    systemProperty "java.util.logging.manager", ".jboss.logmanager.LogManager"
}
compileJava {
    options.encoding = 'UTF-8'
    optionspilerArgs << '-parameters'
}

compileTestJava {
    options.encoding = 'UTF-8'
}

My code (including my generated Movie class, consumer, and resource) is working fine with my local Avro schema. Here’s a snippet of my custom serializer/deserializer:

package ch.rfag;

import java.nio.ByteBuffer;
import java.util.Map;
import .apache.kafkamon.serialization.Serializer;
import me.escoffier.quarkus.Movie;

public class MovieSerializer implements Serializer<Movie> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // No configuration is needed for this serializer.
    }

    @Override
    public byte[] serialize(String topic, Movie movie) {
        if (movie == null) {
            return null;
        }
        try {
            // Convert the Movie to a ByteBuffer using the generated method.
            ByteBuffer buffer = movie.toByteBuffer();
            // Create a byte array with the exact remaining size.
            byte[] bytes = new byte[buffer.remaining()];
            // Copy the contents of the ByteBuffer into the byte array.
            buffer.get(bytes);
            return bytes;
        } catch (Exception e) {
            throw new RuntimeException("Error serializing Movie", e);
        }
    }

    @Override
    public void close() {
        // No resources to close.
    }
}

The problem is that on Azure I have a Schema Registry containing the movie.avsc, but my current serializer and deserializer always use the locally generated Avro schema (via toByteBuffer() and fromByteBuffer()).

My Question: How can I configure my application so that it uses the movie.avsc from the Azure Schema Registry instead of the local copy? In other words, I need to integrate the Azure Schema Registry client (or its Avro serializer/deserializer) so that my messages are serialized/deserialized with the schema as stored in Azure.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论