We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.
@Bean
public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
return containerSpec -> {
containerSpec.receiveTimeout(20_000L);
containerSpec.maxConcurrentConsumers(1);
containerSpec.sessionTransacted(true);
};
}
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
@Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
return channels -> channels.executor(jmsTaskExecutor);
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
public Executor jmsTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
// where no tasks are queued and a new thread is created as needed if none
// are available in the cache
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
public Executor synchronousJmsTaskExecutor() {
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
return taskExecutor;
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.handle(
(payload, headers) -> databaseService.update(payload))
.handle(Jms.outboundAdapter(connectionFactory)
.destination(getQueueName())
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
.get();
}
Are there are issues using a SyncTaskExecutor
here whenever the property appmit-jms-reads-early
is not set.
From Spring Documentation
SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.
We want to commit JMS read transactions based on a property in the middle of our flow. Our implementation currently uses an executor to achieve this.
@Bean
public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
return containerSpec -> {
containerSpec.receiveTimeout(20_000L);
containerSpec.maxConcurrentConsumers(1);
containerSpec.sessionTransacted(true);
};
}
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
@Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
return channels -> channels.executor(jmsTaskExecutor);
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "true")
public Executor jmsTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
// where no tasks are queued and a new thread is created as needed if none
// are available in the cache
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "appmit-jms-reads-early", havingValue = "false", matchIfMissing = true)
public Executor synchronousJmsTaskExecutor() {
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
return taskExecutor;
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.handle(
(payload, headers) -> databaseService.update(payload))
.handle(Jms.outboundAdapter(connectionFactory)
.destination(getQueueName())
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
.get();
}
Are there are issues using a SyncTaskExecutor
here whenever the property appmit-jms-reads-early
is not set.
From Spring Documentation
SyncTaskExecutor: This implementation does not run invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading is not necessary, such as in simple test cases.
Share Improve this question asked Feb 4 at 19:13 VPN236VPN236 273 bronze badges1 Answer
Reset to default 0You are right. There is indeed an issue with that SyncTaskExecutor
for your use-case.
See its implementation:
@Override
public void execute(Runnable task) {
Assert.notNull(task, "Runnable must not be null");
task.run();
}
unlike something like SimpleAsyncTaskExecutor
:
protected void doExecute(Runnable task) {
newThread(task).start();
}
So, the problem that with this SyncTaskExecutor
logic, your transaction is not going to be committed because you just don't leave that JMS thread and perform the rest of the flow exactly in this one.