Tested with paho version 1.2.5
In this example, I am sending a message to the destination root/msg/1/data
. I have two subscriptions: root/msg/1/#
and root/msg/+/#
. Both subscriptions match the message sent to root/msg/1/data
. I expected the message listener to be called once for each subscription. However, to my surprise, the message listener was called twice for each subscription.
public class MqttPahoExample {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(false)
.requestResponseInfo(true)
.build();
mqttConnectionOptions.setSendReasonMessages(false);
MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
mqttClient.connect(mqttConnectionOptions);
// Two subscriptions, both will match the incoming data
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener = (topic, message1) ->
System.out.println("topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
Expected output
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
Actual output
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
I was expecting to receive 1 message for each subscription.
EDIT: I was using paho mqtt5 but as suggested by @Brits in below answer I tried same example with Subscription identifier. But I was still getting extra copy of message for each overlapping subscription. It seems like, it mostly depends on broker implementation that how they interpreted this sentence from specification.
Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.
So I tried different mqtt brokers
- Artemis - output
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:1
root/msg/1/# topic:root/msg/1/data message:TestMessage id:2
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
- Mosquitto - output
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
code using subscription Identifier
public class MqttPahoFinalAsync {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(true)
.requestReponseInfo(true)
.build();
mqttConnectionOptions.setUseSubscriptionIdentifiers(true);
MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
mqttClient.connect(mqttConnectionOptions).waitForCompletion();
// 2 subscriptions, both will match the incoming data
// subscription 1
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties1.setSubscriptionIdentifier(1);
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
System.out.println("root/msg/1/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();
// subscription 2
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties2.setSubscriptionIdentifier(2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
System.out.println("root/msg/+/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
Tested with paho version 1.2.5
In this example, I am sending a message to the destination root/msg/1/data
. I have two subscriptions: root/msg/1/#
and root/msg/+/#
. Both subscriptions match the message sent to root/msg/1/data
. I expected the message listener to be called once for each subscription. However, to my surprise, the message listener was called twice for each subscription.
public class MqttPahoExample {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(false)
.requestResponseInfo(true)
.build();
mqttConnectionOptions.setSendReasonMessages(false);
MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
mqttClient.connect(mqttConnectionOptions);
// Two subscriptions, both will match the incoming data
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener = (topic, message1) ->
System.out.println("topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
Expected output
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
Actual output
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
I was expecting to receive 1 message for each subscription.
EDIT: I was using paho mqtt5 but as suggested by @Brits in below answer I tried same example with Subscription identifier. But I was still getting extra copy of message for each overlapping subscription. It seems like, it mostly depends on broker implementation that how they interpreted this sentence from specification.
Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.
So I tried different mqtt brokers
- Artemis - output
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:1
root/msg/1/# topic:root/msg/1/data message:TestMessage id:2
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
- Mosquitto - output
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
code using subscription Identifier
public class MqttPahoFinalAsync {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(true)
.requestReponseInfo(true)
.build();
mqttConnectionOptions.setUseSubscriptionIdentifiers(true);
MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
mqttClient.connect(mqttConnectionOptions).waitForCompletion();
// 2 subscriptions, both will match the incoming data
// subscription 1
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties1.setSubscriptionIdentifier(1);
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
System.out.println("root/msg/1/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();
// subscription 2
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties2.setSubscriptionIdentifier(2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
System.out.println("root/msg/+/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
Share
Improve this question
edited 2 days ago
Brits
18.3k3 gold badges23 silver badges39 bronze badges
asked Feb 6 at 23:30
Arun SinghArun Singh
131 silver badge3 bronze badges
4
- There isn't a question here, just a statement – hardillb Commented Feb 7 at 8:21
- "I was still getting extra copy of message for each overlapping subscription" this is expected, however the subscription identifier passed in the "PUBLISH" will enable you to tell which subscription(s) the message relates to and process it accordingly (this article may help). – Brits Commented 2 days ago
- thank for highlighting that @Brits. I tried to print the subscription Identifier coming with message. Article says -"If the server chooses to send only one message for overlapping subscriptions, the PUBLISH packet will contain multiple Subscription Identifiers." In my case with paho mqtt 5 with artemis broker I am receiving 1 copy of each overlapping subscription, so I was expecting to get single subscription identifier in each message. But actually I got subscriptionIdentifiers=[1, 2] for all 4 messages. – Arun Singh Commented 2 days ago
- OK, that's unexpected. Artemis supports multiple protocols so I guess it's MQTT implementation may be incomplete. This does not look right (based on a really quick scan of the code!). – Brits Commented 2 days ago
1 Answer
Reset to default 1The MQTT V3 spec allows the server to send the message once, or multiple times:
When Clients make subscriptions with Topic Filters that include wildcards, it is possible for a Client’s subscriptions to overlap so that a published message might match multiple filters. In this case the Server MUST deliver the message to the Client respecting the maximum QoS of all the matching subscriptions [MQTT-3.3.4-2]. In addition, the Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.
(V5 is similar but includes the concept of a "Subscription Identifier" which I cover below).
In your case you are subscribing to two overlapping topics, when the message is received the server is sending the PUBLISH to your client twice (using messages ID's are 1 and 2 in this case). If you used a different server then you might see a different result (as this is something the spec allows, but does not require).
Each message will be sent to each listener, so you get four lines output.
If you want to avoid this then consider using MQTT V5 with subscription identifiers. This would enable your message handlers to ignore any messages not relating to the specific SUBSCRIBE request.