I publish a job on JetStream and want to have a mechanism for waiting for the job result. Something like:
public <T extends JobRequest> CompletableFuture<Void> publishJobRequestWithReply(final T jobRequest) throws Exception {
final var subject = subjectOf(jobRequest);
var replyTo = NUID.nextGlobal();
final var data = mapper.writeValueAsBytes(
new JobRequestEventData<>(jobRequest, replyTo));
var replyToSub = nc.subscribe(replyTo); // Core NATS does not require pre-configured streams, making it perfect for temporary reply subjects.
jetStream.publishAsync(subject, data);
return CompletableFuture.runAsync(() -> {
while (true) {
try {
var msg = replyToSub.nextMessage(Duration.ofSeconds(1));
if (msg == null) {
continue;
}
System.out.println("Reply: " + msg);
break;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
}
}
replyToSub.unsubscribe();
});
Now, the resulting CompletableFuture
is using the Core NATS, and not JetStreams (as no stream is configured for this). Above code works, but I would like to use the JetStream, to have persistent result (as I would like to be sure it comes back).
Should I create a result stream for the results? Or would above code be enough (robust enough)?