I am creating a Quarkus service that accepts SOAP request and puts the data in a AMQP queue following CloudEvent specification. I want to add an additional metadata parameter (extension), but in the end the message does not have it, and I fail to understand why and how to make it work (Quarkus is new to me).
The code is as follows:
Messaging service (with experimentation methods that adds messages on start, and consumes and prints messages from the queue)
package com.example.messaging;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import com.example.domain.Event;
import .eclipse.microprofile.reactive.messaging.*;
import .jboss.logging.Logger;
import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class MessagingService {
private static final Logger LOG = Logger.getLogger(MessagingService.class);
@Inject
@Channel("events-out")
Emitter<Event> emitter;
@SuppressWarnings("unchecked")
public void send(Event event, String actionType) {
LOG.debug("Sending event with event number [%s]".formatted(event.getNumber()));
var message = Message.of(event);
var metadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
message.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withExtension("action_type", actionType) // FIXME Does not work. Parameter is missing in the message.
.build());
emitter.send(message);
}
void onStart(@Observes StartupEvent ev) {
var event1 = new Event();
event1.setNumber(1);
var event2 = new Event();
event2.setNumber(2);
send(event1, "test");
send(event2, "test");
}
@Incoming("events-in")
public CompletionStage<Void> printMessage(Message<Event> message) {
System.out.println(message.getPayload());
message.getMetadata(CloudEventMetadata.class)
.ifPresent(metadata -> {
System.out.println("CloudEvent ID: " + metadata.getId());
System.out.println("CloudEvent Type: " + metadata.getType());
System.out.println("CloudEvent Source: " + metadata.getSource());
System.out.println("CloudEvent Timestamp: " + metadata.getTimeStamp());
System.out.println("CloudEvent content type: " + metadata.getDataContentType());
System.out.println("CloudEvent spec version: " + metadata.getSpecVersion());
System.out.println("CloudEvent extensions: " + metadata.getExtensions());
});
return message.ack();
}
}
Dummy domain class
package com.example.domain;
public class Event {
private Integer number;
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
}
build.gradle.kts
plugins {
java
id("io.quarkus")
}
repositories {
mavenCentral()
mavenLocal()
}
val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project
dependencies {
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation(enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cxf-bom:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-resteasy")
implementation("io.quarkiverse.cxf:quarkus-cxf:1.0.1")
implementation("io.quarkus:quarkus-arc")
implementation("io.quarkus:quarkus-messaging-amqp")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.rest-assured:rest-assured")
}
group = "com.example"
version = "1.0-SNAPSHOT"
java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}
tasks.withType<Test> {
systemProperty("java.util.logging.manager", ".jboss.logmanager.LogManager")
}
tasks.withType<JavaCompile> {
options.encoding = "UTF-8"
optionspilerArgs.add("-parameters")
}
application.properties
mp.messaging.outgoing.events-out.address=events
mp.messaging.outgoing.events-out.cloud-events-type=com.example.service.event
mp.messaging.outgoing.events-out.cloud-events-source=/source
mp.messaging.outgoing.events-out.connector=smallrye-amqp
mp.messaging.incoming.events-in.address=events
Edit: reduced amount of code