最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Duplicate messages received if multiple subscription matches with incoming message - Stack Overflow

programmeradmin0浏览0评论

Tested with paho version 1.2.5

In this example, I am sending a message to the destination root/msg/1/data. I have two subscriptions: root/msg/1/# and root/msg/+/#. Both subscriptions match the message sent to root/msg/1/data. I expected the message listener to be called once for each subscription. However, to my surprise, the message listener was called twice for each subscription.

public class MqttPahoExample {

    public static void main(String[] args) throws MqttException {
        MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
        MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
                .username("user")
                .password("password".getBytes())
                .cleanStart(false)
                .requestResponseInfo(true)
                .build();
        mqttConnectionOptions.setSendReasonMessages(false);

        MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
        mqttClient.connect(mqttConnectionOptions);

        // Two subscriptions, both will match the incoming data
        MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
        MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
        IMqttMessageListener iMqttMessageListener = (topic, message1) ->
                System.out.println("topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
        IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
        mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);

        // Publish message
        MqttMessage message = new MqttMessage("TestMessage".getBytes());
        message.setQos(2);
        mqttClient.publish("root/msg/1/data", message);
    }
}

Expected output

topic:root/msg/1/data  message:TestMessage id:1
topic:root/msg/1/data  message:TestMessage id:1

Actual output

topic:root/msg/1/data  message:TestMessage id:2
topic:root/msg/1/data  message:TestMessage id:2
topic:root/msg/1/data  message:TestMessage id:1
topic:root/msg/1/data  message:TestMessage id:1

I was expecting to receive 1 message for each subscription.


EDIT: I was using paho mqtt5 but as suggested by @Brits in below answer I tried same example with Subscription identifier. But I was still getting extra copy of message for each overlapping subscription. It seems like, it mostly depends on broker implementation that how they interpreted this sentence from specification.

Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.

So I tried different mqtt brokers

  1. Artemis - output
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:2
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:2
  1. Mosquitto - output
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:2

code using subscription Identifier

public class MqttPahoFinalAsync {

    public static void main(String[] args) throws MqttException {
        MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
        MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
                .username("user")
                .password("password".getBytes())
                .cleanStart(true)
                .requestReponseInfo(true)
                .build();
        mqttConnectionOptions.setUseSubscriptionIdentifiers(true);

        MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
        mqttClient.connect(mqttConnectionOptions).waitForCompletion();

        // 2 subscriptions, both will match the incoming data

        // subscription 1
        MqttProperties subProperties1 = new MqttProperties();
        subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
        subProperties1.setSubscriptionIdentifier(1);

        MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
        IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
                System.out.println("root/msg/1/#  topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();

        // subscription 2
        MqttProperties subProperties2 = new MqttProperties();
        subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
        subProperties2.setSubscriptionIdentifier(2);
        MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);

        IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
                System.out.println("root/msg/+/#  topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();

        // Publish message
        MqttMessage message = new MqttMessage("TestMessage".getBytes());
        message.setQos(2);
        mqttClient.publish("root/msg/1/data", message);
    }
}

Tested with paho version 1.2.5

In this example, I am sending a message to the destination root/msg/1/data. I have two subscriptions: root/msg/1/# and root/msg/+/#. Both subscriptions match the message sent to root/msg/1/data. I expected the message listener to be called once for each subscription. However, to my surprise, the message listener was called twice for each subscription.

public class MqttPahoExample {

    public static void main(String[] args) throws MqttException {
        MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
        MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
                .username("user")
                .password("password".getBytes())
                .cleanStart(false)
                .requestResponseInfo(true)
                .build();
        mqttConnectionOptions.setSendReasonMessages(false);

        MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
        mqttClient.connect(mqttConnectionOptions);

        // Two subscriptions, both will match the incoming data
        MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
        MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
        IMqttMessageListener iMqttMessageListener = (topic, message1) ->
                System.out.println("topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
        IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
        mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);

        // Publish message
        MqttMessage message = new MqttMessage("TestMessage".getBytes());
        message.setQos(2);
        mqttClient.publish("root/msg/1/data", message);
    }
}

Expected output

topic:root/msg/1/data  message:TestMessage id:1
topic:root/msg/1/data  message:TestMessage id:1

Actual output

topic:root/msg/1/data  message:TestMessage id:2
topic:root/msg/1/data  message:TestMessage id:2
topic:root/msg/1/data  message:TestMessage id:1
topic:root/msg/1/data  message:TestMessage id:1

I was expecting to receive 1 message for each subscription.


EDIT: I was using paho mqtt5 but as suggested by @Brits in below answer I tried same example with Subscription identifier. But I was still getting extra copy of message for each overlapping subscription. It seems like, it mostly depends on broker implementation that how they interpreted this sentence from specification.

Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.

So I tried different mqtt brokers

  1. Artemis - output
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:2
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:2
  1. Mosquitto - output
root/msg/1/#  topic:root/msg/1/data  message:TestMessage id:1
root/msg/+/#  topic:root/msg/1/data  message:TestMessage id:2

code using subscription Identifier

public class MqttPahoFinalAsync {

    public static void main(String[] args) throws MqttException {
        MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
        MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
                .username("user")
                .password("password".getBytes())
                .cleanStart(true)
                .requestReponseInfo(true)
                .build();
        mqttConnectionOptions.setUseSubscriptionIdentifiers(true);

        MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
        mqttClient.connect(mqttConnectionOptions).waitForCompletion();

        // 2 subscriptions, both will match the incoming data

        // subscription 1
        MqttProperties subProperties1 = new MqttProperties();
        subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
        subProperties1.setSubscriptionIdentifier(1);

        MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
        IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
                System.out.println("root/msg/1/#  topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();

        // subscription 2
        MqttProperties subProperties2 = new MqttProperties();
        subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
        subProperties2.setSubscriptionIdentifier(2);
        MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);

        IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
                System.out.println("root/msg/+/#  topic:" + topic + "  message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

        mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();

        // Publish message
        MqttMessage message = new MqttMessage("TestMessage".getBytes());
        message.setQos(2);
        mqttClient.publish("root/msg/1/data", message);
    }
}
Share Improve this question edited 2 days ago Brits 18.3k3 gold badges23 silver badges39 bronze badges asked Feb 6 at 23:30 Arun SinghArun Singh 131 silver badge3 bronze badges 4
  • There isn't a question here, just a statement – hardillb Commented Feb 7 at 8:21
  • "I was still getting extra copy of message for each overlapping subscription" this is expected, however the subscription identifier passed in the "PUBLISH" will enable you to tell which subscription(s) the message relates to and process it accordingly (this article may help). – Brits Commented 2 days ago
  • thank for highlighting that @Brits. I tried to print the subscription Identifier coming with message. Article says -"If the server chooses to send only one message for overlapping subscriptions, the PUBLISH packet will contain multiple Subscription Identifiers." In my case with paho mqtt 5 with artemis broker I am receiving 1 copy of each overlapping subscription, so I was expecting to get single subscription identifier in each message. But actually I got subscriptionIdentifiers=[1, 2] for all 4 messages. – Arun Singh Commented 2 days ago
  • OK, that's unexpected. Artemis supports multiple protocols so I guess it's MQTT implementation may be incomplete. This does not look right (based on a really quick scan of the code!). – Brits Commented 2 days ago
Add a comment  | 

1 Answer 1

Reset to default 1

The MQTT V3 spec allows the server to send the message once, or multiple times:

When Clients make subscriptions with Topic Filters that include wildcards, it is possible for a Client’s subscriptions to overlap so that a published message might match multiple filters. In this case the Server MUST deliver the message to the Client respecting the maximum QoS of all the matching subscriptions [MQTT-3.3.4-2]. In addition, the Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.

(V5 is similar but includes the concept of a "Subscription Identifier" which I cover below).

In your case you are subscribing to two overlapping topics, when the message is received the server is sending the PUBLISH to your client twice (using messages ID's are 1 and 2 in this case). If you used a different server then you might see a different result (as this is something the spec allows, but does not require).

Each message will be sent to each listener, so you get four lines output.

If you want to avoid this then consider using MQTT V5 with subscription identifiers. This would enable your message handlers to ignore any messages not relating to the specific SUBSCRIBE request.

发布评论

评论列表(0)

  1. 暂无评论