最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - quarkus-messaging-amqp CloudEvent with extentions - Stack Overflow

programmeradmin2浏览0评论

I am creating a Quarkus service that accepts SOAP request and puts the data in a AMQP queue following CloudEvent specification. I want to add an additional metadata parameter (extension), but in the end the message does not have it, and I fail to understand why and how to make it work (Quarkus is new to me).

The code is as follows:

Messaging service (with experimentation methods that adds messages on start, and consumes and prints messages from the queue)

package com.example.messaging;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import com.example.domain.Event;
import .eclipse.microprofile.reactive.messaging.*;
import .jboss.logging.Logger;

import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class MessagingService {

    private static final Logger LOG = Logger.getLogger(MessagingService.class);

    @Inject
    @Channel("events-out")
    Emitter<Event> emitter;

    @SuppressWarnings("unchecked")
    public void send(Event event, String actionType) {
        LOG.debug("Sending event with event number [%s]".formatted(event.getNumber()));
        var message = Message.of(event);
        var metadata = message.getMetadata(OutgoingCloudEventMetadata.class)
                .orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
        message.addMetadata(OutgoingCloudEventMetadata.from(metadata)
                .withExtension("action_type", actionType) // FIXME Does not work. Parameter is missing in the message.
                .build());
        emitter.send(message);
    }

    void onStart(@Observes StartupEvent ev) {
        var event1 = new Event();
        event1.setNumber(1);
        var event2 = new Event();
        event2.setNumber(2);
        send(event1, "test");
        send(event2, "test");
    }

    @Incoming("events-in")
    public CompletionStage<Void> printMessage(Message<Event> message) {
        System.out.println(message.getPayload());
        message.getMetadata(CloudEventMetadata.class)
                .ifPresent(metadata -> {
                    System.out.println("CloudEvent ID: " + metadata.getId());
                    System.out.println("CloudEvent Type: " + metadata.getType());
                    System.out.println("CloudEvent Source: " + metadata.getSource());
                    System.out.println("CloudEvent Timestamp: " + metadata.getTimeStamp());
                    System.out.println("CloudEvent content type: " + metadata.getDataContentType());
                    System.out.println("CloudEvent spec version: " + metadata.getSpecVersion());
                    System.out.println("CloudEvent extensions: " + metadata.getExtensions());
                });

        return message.ack();
    }
}

Dummy domain class

package com.example.domain;

public class Event {

    private Integer number;

    public Integer getNumber() {
        return number;
    }

    public void setNumber(Integer number) {
        this.number = number;
    }
}

build.gradle.kts

plugins {
    java
    id("io.quarkus")
}

repositories {
    mavenCentral()
    mavenLocal()
}

val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project

dependencies {
    implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
    implementation(enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cxf-bom:${quarkusPlatformVersion}"))

    implementation("io.quarkus:quarkus-resteasy")
    implementation("io.quarkiverse.cxf:quarkus-cxf:1.0.1")
    implementation("io.quarkus:quarkus-arc")
    implementation("io.quarkus:quarkus-messaging-amqp")

    testImplementation("io.quarkus:quarkus-junit5")
    testImplementation("io.rest-assured:rest-assured")
}

group = "com.example"
version = "1.0-SNAPSHOT"

java {
    sourceCompatibility = JavaVersion.VERSION_21
    targetCompatibility = JavaVersion.VERSION_21
}

tasks.withType<Test> {
    systemProperty("java.util.logging.manager", ".jboss.logmanager.LogManager")
}
tasks.withType<JavaCompile> {
    options.encoding = "UTF-8"
    optionspilerArgs.add("-parameters")
}

application.properties

mp.messaging.outgoing.events-out.address=events
mp.messaging.outgoing.events-out.cloud-events-type=com.example.service.event
mp.messaging.outgoing.events-out.cloud-events-source=/source
mp.messaging.outgoing.events-out.connector=smallrye-amqp

mp.messaging.incoming.events-in.address=events

Edit: reduced amount of code

发布评论

评论列表(0)

  1. 暂无评论