I have Redis cluster with 6 nodes deployed in my k8s environment. The deployment environment can be on GKE or a private cloud on Openshift.
Deployed Quarkus application using the Redis Client extension. I'm using a ReactiveRedisDataSource to register for a subscription for expired key events, e.g. ReactivePubSubCommands. Im using mutiny for reactive processing of these events.
However I find that I receive some of the expired key events but not all of them. Is there an known issue here? Do I need to register explicitly against each host in the cluster?
This is the pipeline of the workflow; which work when a message arrives.
private void subscribe() {
pubSubCommands.subscribeToPatterns(pushConfig.keyPattern())
.select()
.where(msg -> msgPattern.matcher(msg).matches())
.onItem()
.invoke(expiredCacheKVCallback::expiredMessageKey)
.onFailure()
.invoke(err -> {
logger.error("Valkey subscription error. Attempting to reconnect...", err);
scheduleReconnect();
})
.subscribe()
.with(msg -> logger.infof("New message has been processed %s.", msg),
err -> logger.error("Error while listening for new events from the cache ", err));
}
Any hints or suggestions?