最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Spring Integrations. Sending messages to MQTT - Stack Overflow

programmeradmin3浏览0评论

As part of the learning curve, I am trying to publish a message to Mosquitto MQTT using Spring Integrations. I've set up a config (mostly from parts I found by googling, but the Listener part works well):

@Configuration
@RequiredArgsConstructor
@IntegrationComponentScan
public class MqttConfig {
private final MqttProperties properties;
    private final ObjectMapper objectMapper;

    private List<String> sensorTopics;

    @PostConstruct
    void init() {
        sensorTopics = properties.getSensors().stream()
                .map(MqttProperties.Sensor::getTopic)
                .toList();
    }

    @Bean
    public IntegrationFlow sensorDataFlow(HumilityAndTempSensorMessageHandler humilityAndTempSensorMessageHandler) {
        return IntegrationFlow.from(
                        new MqttPahoMessageDrivenChannelAdapter(
                                properties.getUrl(),
                                "iotBridge",
                                "#")
                )
                .filter(getSensorTopicFilter())
                .transform(getTempSensorTransformer())
                .handle(humilityAndTempSensorMessageHandler::handleMessage)
                .get();
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("exoluse", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface PublishGateway {
        void send(String message, @Header(MqttHeaders.TOPIC) String topic);
    }

    private GenericTransformer<String, TempSensorReading> getTempSensorTransformer() {
        return source -> {
            try {
                return objectMapper.readValue(source, TempSensorReading.class);
            } catch (Exception e) {
                System.out.println("failed to resolve input");
            }
            return null;
        };
    }

    private GenericHandler<String> getSensorTopicFilter() {
        return (payload, headers) -> {
            String topic = Optional.ofNullable(headers)
                    .map(item -> item.get("mqtt_receivedTopic"))
                    .map(String::valueOf)
                    .orElse(null);
            return sensorTopics.contains(topic);
        };
    }

And I'm trying to send a message:

@Service
//@RequiredArgsConstructor
@Slf4j
public class SocketGateway {

    @Autowired
    private MqttConfig.PublishGateway publishGateway;

    @PostConstruct
    void t() {
        // ${SHELLY_ID}/command/switch:0 -m toggle
        String topic = "xxx/command/switch:0";
        System.out.println(">>>>>> "+publishGateway);
        publishGateway.send("toggle", topic);
    }
}

And I'm getting the Dispatcher has no subscribers Exception.

Dependency version:

    <dependency>
      <groupId>.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

What am I doing wrong?

UPD: full pom

 <parent>
    <groupId>.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.2</version>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <mavenpiler.source>15</mavenpiler.source>
    <mavenpiler.target>15</mavenpiler.target>
    <javafx.version>17.0.1</javafx.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <dependency>
      <groupId>.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.28</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

  </dependencies>
</project>

Full stacktrace of the error:

.springframework.beans.factory.BeanCreationException: Error creating bean with name 'socketGateway': Invocation of init method failed
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:222) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1762) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942) ~[spring-context-6.0.11.jar:6.0.11]
    at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608) ~[spring-context-6.0.11.jar:6.0.11]
    at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-3.1.2.jar:3.1.2]
    at .draakz.iotbridge.Application.main(Application.java:9) ~[classes/:na]
Caused by: .springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
    at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:466) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:669) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:584) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:550) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:540) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.11.jar:6.0.11]
    at .springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:244) ~[spring-aop-6.0.11.jar:6.0.11]
    at jdk.proxy2/jdk.proxy2.$Proxy83.send(Unknown Source) ~[na:na]
    at .draakz.iotbridge.mqtt.publisher.SocketGateway.t(SocketGateway.java:26) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMethod.invoke(InitDestroyAnnotationBeanPostProcessor.java:457) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:401) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:219) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted
Caused by: .springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at .springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.1.2.jar:6.1.2]
    ... 42 common frames omitted

As part of the learning curve, I am trying to publish a message to Mosquitto MQTT using Spring Integrations. I've set up a config (mostly from parts I found by googling, but the Listener part works well):

@Configuration
@RequiredArgsConstructor
@IntegrationComponentScan
public class MqttConfig {
private final MqttProperties properties;
    private final ObjectMapper objectMapper;

    private List<String> sensorTopics;

    @PostConstruct
    void init() {
        sensorTopics = properties.getSensors().stream()
                .map(MqttProperties.Sensor::getTopic)
                .toList();
    }

    @Bean
    public IntegrationFlow sensorDataFlow(HumilityAndTempSensorMessageHandler humilityAndTempSensorMessageHandler) {
        return IntegrationFlow.from(
                        new MqttPahoMessageDrivenChannelAdapter(
                                properties.getUrl(),
                                "iotBridge",
                                "#")
                )
                .filter(getSensorTopicFilter())
                .transform(getTempSensorTransformer())
                .handle(humilityAndTempSensorMessageHandler::handleMessage)
                .get();
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("exoluse", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface PublishGateway {
        void send(String message, @Header(MqttHeaders.TOPIC) String topic);
    }

    private GenericTransformer<String, TempSensorReading> getTempSensorTransformer() {
        return source -> {
            try {
                return objectMapper.readValue(source, TempSensorReading.class);
            } catch (Exception e) {
                System.out.println("failed to resolve input");
            }
            return null;
        };
    }

    private GenericHandler<String> getSensorTopicFilter() {
        return (payload, headers) -> {
            String topic = Optional.ofNullable(headers)
                    .map(item -> item.get("mqtt_receivedTopic"))
                    .map(String::valueOf)
                    .orElse(null);
            return sensorTopics.contains(topic);
        };
    }

And I'm trying to send a message:

@Service
//@RequiredArgsConstructor
@Slf4j
public class SocketGateway {

    @Autowired
    private MqttConfig.PublishGateway publishGateway;

    @PostConstruct
    void t() {
        // ${SHELLY_ID}/command/switch:0 -m toggle
        String topic = "xxx/command/switch:0";
        System.out.println(">>>>>> "+publishGateway);
        publishGateway.send("toggle", topic);
    }
}

And I'm getting the Dispatcher has no subscribers Exception.

Dependency version:

    <dependency>
      <groupId>.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

What am I doing wrong?

UPD: full pom

 <parent>
    <groupId>.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.2</version>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <mavenpiler.source>15</mavenpiler.source>
    <mavenpiler.target>15</mavenpiler.target>
    <javafx.version>17.0.1</javafx.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <dependency>
      <groupId>.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.28</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

  </dependencies>
</project>

Full stacktrace of the error:

.springframework.beans.factory.BeanCreationException: Error creating bean with name 'socketGateway': Invocation of init method failed
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:222) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1762) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942) ~[spring-context-6.0.11.jar:6.0.11]
    at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608) ~[spring-context-6.0.11.jar:6.0.11]
    at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-3.1.2.jar:3.1.2]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-3.1.2.jar:3.1.2]
    at .draakz.iotbridge.Application.main(Application.java:9) ~[classes/:na]
Caused by: .springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
    at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-6.0.11.jar:6.0.11]
    at .springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:466) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:669) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:584) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:550) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:540) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.11.jar:6.0.11]
    at .springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:244) ~[spring-aop-6.0.11.jar:6.0.11]
    at jdk.proxy2/jdk.proxy2.$Proxy83.send(Unknown Source) ~[na:na]
    at .draakz.iotbridge.mqtt.publisher.SocketGateway.t(SocketGateway.java:26) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMethod.invoke(InitDestroyAnnotationBeanPostProcessor.java:457) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:401) ~[spring-beans-6.0.11.jar:6.0.11]
    at .springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:219) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted
Caused by: .springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at .springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.1.2.jar:6.1.2]
    ... 42 common frames omitted
Share Improve this question edited 3 hours ago Draaksward asked 5 hours ago DraakswardDraaksward 8111 gold badge10 silver badges38 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

You do this:

@PostConstruct
void t() {
    // ${SHELLY_ID}/command/switch:0 -m toggle
    String topic = "xxx/command/switch:0";
    System.out.println(">>>>>> "+publishGateway);
    publishGateway.send("toggle", topic);
}

Basically application context initialization phase. This is too early to interact with external system, which you are trying to do via that message sending.

I see that you use Spring Boot, so indeed @EnableIntegation is there and that @ServiceActivator is visible.

Only the problem that you emit messages into channel which does not have subscribers yet, because that happens in the application start phase.

Consider to use an @EventListener(ApplicationReadyEvent.class) method to emit messages instead doing that from the @PostConstruct.

I think we can try to improve the error report. Instead of Dispatcher has no subscribers we may emit something like No message producing are allowed during initialization phase. Not sure if we can catch such a state, but that's already technical details.

Please, raise respective GH issue and we will think about it for the next 6.5 version.

发布评论

评论列表(0)

  1. 暂无评论