最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Apache Activemq classic scheduled send seems to be bugged with Spring Boot JMS - Stack Overflow

programmeradmin1浏览0评论

Unless I'm misreading the apache activeMQ docs, it appears that the use of AMQ_SCHEDULED_DELAY, AMQ_SCHEDULED_PERIOD, and AMQ_SCHEDULED_REPEAT is bugged when they are used together when sending messages using Spring JMS. The docs seem to suggest that when they are used in concert, the expected behavior is:

  • AMQ_SCHEDULED_DELAY: The time in milliseconds that a message will wait before being scheduled to be delivered by the broker
  • AMQ_SCHEDULED_PERIOD: The time in milliseconds to wait after the start time to wait before scheduling the message again
  • AMQ_SCHEDULED_REPEAT: The number of times to repeat scheduling a message for delivery

except I'm seeing that when using AMQ_SCHEDULED_PERIOD and AMQ_SCHEDULED_REPEAT, the initial message delivery is also delayed, even if AMQ_SCHEDULED_DELAY is explicitly set to 0. i.e. if AMQ_SCHEDULED_PERIOD is set to 1000 and AMQ_SCHEDULED_REPEAT is set to 2, and the message is sent at time 0, then the message is delivered at time 1000, 2000, and 3000. The same behavior happens when AMQ_SCHEDULED_DELAY is explicitly set to 0.

I threw together a small spring boot app to demo this:

ExampleApplication.java

package curious.jms.delay.bug.example;

import jakarta.jms.JMSException;
import jakarta.jms.TextMessage;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import .apache.activemq.ScheduledMessage;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.boot.SpringApplication;
import .springframework.boot.autoconfigure.SpringBootApplication;
import .springframework.boot.context.event.ApplicationReadyEvent;
import .springframework.context.ApplicationContext;
import .springframework.context.event.EventListener;
import .springframework.jms.annotation.EnableJms;
import .springframework.jms.annotation.JmsListener;
import .springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class ExampleApplication {

  @Autowired
  JmsTemplate jmsTemplate;
  @Autowired
  private ApplicationContext context;

  private Map<String, Instant> messageSendTimes = new HashMap<>();
  private Map<String, List<Instant>> messageReceiveTimes = new HashMap<>();
  private final String QUEUE_NAME = "example.queue";

  public static void main(String[] args) {
    SpringApplication.run(ExampleApplication.class, args);
  }

  @EventListener(ApplicationReadyEvent.class)
  public void run() throws InterruptedException {
    sendRegularMessage();
    sendDelayedMessage();
    sendDelayedRepeatedMessage();
    Thread.sleep(10000);
    printResults(messageSendTimes, messageReceiveTimes);
    SpringApplication.exit(context, () -> 0);
  }

  private void sendRegularMessage() {
    String text = "regular";
    System.out.println("queueing message " + text);
    messageSendTimes.put(text, Instant.now());
    jmsTemplate.send(QUEUE_NAME, s -> s.createTextMessage(text));
  }

  private void sendDelayedMessage() {
    String text = "delayed";
    System.out.println("queueing message " + text);
    messageSendTimes.put(text, Instant.now());
    jmsTemplate.send(QUEUE_NAME, s -> {
      TextMessage m = s.createTextMessage(text);
      m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
      return m;
    });
  }

  private void sendDelayedRepeatedMessage() {
    String text = "delayed repeated";
    System.out.println("queueing message " + text);
    messageSendTimes.put(text, Instant.now());
    jmsTemplate.send(QUEUE_NAME, s -> {
      TextMessage m = s.createTextMessage(text);
      m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
      m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
      m.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 5);
      return m;
    });
  }

  @JmsListener(destination = QUEUE_NAME)
  public void listener(TextMessage m) throws JMSException {
    if (messageReceiveTimes.containsKey(m.getText())) {
      messageReceiveTimes.get(m.getText()).add(Instant.now());
    } else {
      List<Instant> l = new ArrayList<>();
      l.add(Instant.now());
      messageReceiveTimes.put(m.getText(), l);
    }
    System.out.println("received message " + m.getText() + " at " + Instant.now());
  }

  private void printResults(Map<String, Instant> sendTimes, Map<String, List<Instant>> receiveTimes) {
    System.out.printf("%16s | %20s | %20s | %10s %n", "message", "sendTime", "receiveTime", "diff");
    for (String k : sendTimes.keySet()) {
      Instant sendTime = sendTimes.get(k);
      for (Instant instant : receiveTimes.get(k)) {
        System.out.printf("%16s | %20s | %20s | %8s %n", k, formatInstant(sendTime), formatInstant(instant),
          Duration.between(sendTime,
            instant).toMillis());
      }
    }
  }

  private String formatInstant(Instant i) {
    return DateTimeFormatter.ISO_TIME.format(i.atOffset(ZoneOffset.UTC));
  }
}

application.properties

spring.application.name=example
spring.activemq.broker-url=vm://localhost?broker.persistent=true&broker.schedulerSupport=true

build.gradle

plugins {
    id 'java'
    id '.springframework.boot' version '3.4.4'
    id 'io.spring.dependency-management' version '1.1.7'
}

group = 'curious.jms.delay.bug'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation '.springframework.boot:spring-boot-starter-activemq'
    implementation '.apache.activemq:activemq-kahadb-store:6.1.6'
    testImplementation '.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly '.junit.platform:junit-platform-launcher'
}

tasks.named('test') {
    useJUnitPlatform()
}

A run of the above application outputs the following

         message |             sendTime |          receiveTime |       diff 
delayed repeated |    19:22:12.0743304Z |    19:22:13.5186246Z |     1444 
delayed repeated |    19:22:12.0743304Z |    19:22:14.5030668Z |     2428 
delayed repeated |    19:22:12.0743304Z |    19:22:15.5038109Z |     3429 
delayed repeated |    19:22:12.0743304Z |    19:22:16.5035174Z |     4429 
delayed repeated |    19:22:12.0743304Z |    19:22:17.6968498Z |     5622 
delayed repeated |    19:22:12.0743304Z |    19:22:18.5176954Z |     6443 
         delayed |    19:22:12.0642786Z |    19:22:13.5080843Z |     1443 
         regular |    19:22:12.0005657Z |    19:22:12.0672787Z |       66 

notice how the first row has a ms diff of 1444? I would expect this to be similar to the value of the "regular" row. This seems to indicate that the period is changing the delivery time (delay) of the first message. When sending messages via the activeMQ broker ui, the first message is sent immediately as far as I can tell. Is this a bug, or did I read the docs wrong?

发布评论

评论列表(0)

  1. 暂无评论