[Problem]
We are facing an issue where a connection created in one thread is being shared with another thread when a new request comes in. This happens even though each request should have its own independent connection.
[Key constraints]
- A connection is created per thread, but when another thread starts a request, the connection from the previous thread gets shared.
- Due to business requirements, we cannot set
TcpNioClientConnectionFactory.setSingleUse(true).
[Tech Stack]
- Spring Integration 5.1.0
- Spring Boot 2.1.0
- JDK 1.8
Below is our TcpClientConfig code:
@EnableIntegration
@Configuration
@RequiredArgsConstructor
@Slf4j
@ComponentScan
public class TcpClientConfig implements ApplicationEventPublisherAware {
private final Properties properties;
private static final long TIMEOUT = 20000L;
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Bean
public AbstractClientConnectionFactory clientConnectionFactory() {
TcpNioClientConnectionFactory tcpNioClientConnectionFactory = new TcpNioClientConnectionFactory(properties.getIp(), properties.getPort());
tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
tcpNioClientConnectionFactory.setSingleUse(true);
tcpNioClientConnectionFactory.setSerializer(new ByteArrayCrSerializer());
tcpNioClientConnectionFactory.setDeserializer(new ByteArrayCrSerializer());
tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
return tcpNioClientConnectionFactory;
}
public void closeConnection() throws Exception {
try {
TcpConnectionSupport oldConnection = clientConnectionFactory().getConnection();
if (oldConnection != null) {
oldConnection.close();
}
} catch (Exception e){
throw new Exception("error", e);
}
}
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel inboundChannel() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
tcpOutboundGateway.setRequestTimeout(TIMEOUT);
tcpOutboundGateway.setRemoteTimeout(TIMEOUT);
tcpOutboundGateway.setReplyChannel(inboundChannel());
return tcpOutboundGateway;
}
@EventListener
public void handleTcpConnectionEvent(TcpConnectionOpenEvent event) {
log.info("============================== TCP Connection Opened : {} ==============================", event.getConnectionId());
}
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
log.info("============================== TCP Connection Closed : {} ==============================", event.getConnectionId());
}
}
And here is the calling code:
@Service
@Slf4j
@RequiredArgsConstructor
@ComponentScan
public class TcpMessageService {
private final TcpClientConfig tcpClientConfig;
public void sendMessage(byte[] tcpData) {
Message<byte[]> message = MessageBuilder.withPayload(tcpData).build();
tcpClientConfig.outboundChannel().send(message);
}
public void sendAck() {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
ByteArrayCrSerializer serializer = new ByteArrayCrSerializer();
serializer.serialize(ACK.getBytes(StandardCharsets.UTF_8), byteArrayOutputStream);
byte[] serializedAck = byteArrayOutputStream.toByteArray();
Message<byte[]> ackMessage = MessageBuilder.withPayload(serializedAck).build();
tcpClientConfig.outboundChannel().send(ackMessage);
} catch (IOException e) {
log.error("error", e);
}
}
public Message<?> receiveMessage() {
return tcpClientConfig.inboundChannel().receive(TIMEOUT);
}
}
[Desired Solution]
We need a way to ensure that each thread creates and uses its own connection without sharing it with other threads. Every new request from a different thread should always establish a fresh connection, completely independent of any previous connections.
How can we achieve this while keeping setSingleUse(false)
? Any suggestions or alternative approaches would be greatly appreciated!
[Problem]
We are facing an issue where a connection created in one thread is being shared with another thread when a new request comes in. This happens even though each request should have its own independent connection.
[Key constraints]
- A connection is created per thread, but when another thread starts a request, the connection from the previous thread gets shared.
- Due to business requirements, we cannot set
TcpNioClientConnectionFactory.setSingleUse(true).
[Tech Stack]
- Spring Integration 5.1.0
- Spring Boot 2.1.0
- JDK 1.8
Below is our TcpClientConfig code:
@EnableIntegration
@Configuration
@RequiredArgsConstructor
@Slf4j
@ComponentScan
public class TcpClientConfig implements ApplicationEventPublisherAware {
private final Properties properties;
private static final long TIMEOUT = 20000L;
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Bean
public AbstractClientConnectionFactory clientConnectionFactory() {
TcpNioClientConnectionFactory tcpNioClientConnectionFactory = new TcpNioClientConnectionFactory(properties.getIp(), properties.getPort());
tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
tcpNioClientConnectionFactory.setSingleUse(true);
tcpNioClientConnectionFactory.setSerializer(new ByteArrayCrSerializer());
tcpNioClientConnectionFactory.setDeserializer(new ByteArrayCrSerializer());
tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
return tcpNioClientConnectionFactory;
}
public void closeConnection() throws Exception {
try {
TcpConnectionSupport oldConnection = clientConnectionFactory().getConnection();
if (oldConnection != null) {
oldConnection.close();
}
} catch (Exception e){
throw new Exception("error", e);
}
}
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel inboundChannel() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
tcpOutboundGateway.setRequestTimeout(TIMEOUT);
tcpOutboundGateway.setRemoteTimeout(TIMEOUT);
tcpOutboundGateway.setReplyChannel(inboundChannel());
return tcpOutboundGateway;
}
@EventListener
public void handleTcpConnectionEvent(TcpConnectionOpenEvent event) {
log.info("============================== TCP Connection Opened : {} ==============================", event.getConnectionId());
}
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
log.info("============================== TCP Connection Closed : {} ==============================", event.getConnectionId());
}
}
And here is the calling code:
@Service
@Slf4j
@RequiredArgsConstructor
@ComponentScan
public class TcpMessageService {
private final TcpClientConfig tcpClientConfig;
public void sendMessage(byte[] tcpData) {
Message<byte[]> message = MessageBuilder.withPayload(tcpData).build();
tcpClientConfig.outboundChannel().send(message);
}
public void sendAck() {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
ByteArrayCrSerializer serializer = new ByteArrayCrSerializer();
serializer.serialize(ACK.getBytes(StandardCharsets.UTF_8), byteArrayOutputStream);
byte[] serializedAck = byteArrayOutputStream.toByteArray();
Message<byte[]> ackMessage = MessageBuilder.withPayload(serializedAck).build();
tcpClientConfig.outboundChannel().send(ackMessage);
} catch (IOException e) {
log.error("error", e);
}
}
public Message<?> receiveMessage() {
return tcpClientConfig.inboundChannel().receive(TIMEOUT);
}
}
[Desired Solution]
We need a way to ensure that each thread creates and uses its own connection without sharing it with other threads. Every new request from a different thread should always establish a fresh connection, completely independent of any previous connections.
How can we achieve this while keeping setSingleUse(false)
? Any suggestions or alternative approaches would be greatly appreciated!
1 Answer
Reset to default 1The TcpNioClientConnectionFactory
with setSingleUse(false)
would create a single connection and let it to be reused between consumers of this factory.
Consider to use a CachingClientConnectionFactory
wrapper around your TcpNioClientConnectionFactory
. This way, even if with that setSingleUse(true)
internally, the connection is not going to be closed immediately, but rather returned to the cache. Plus, the cache logic is that no one connection would be used in different threads at the same time.
There is also a ThreadAffinityClientConnectionFactory
for your consideration.
See more info in docs:
https://docs.spring.io/spring-integration/reference/ip/tcp-connection-factories.html#caching-cf
https://docs.spring.io/spring-integration/reference/ip/tcp-connection-factories.html#tcp-affinity-cf
Due to business requirements, we cannot set
, but then I see in the codetcpNioClientConnectionFactory.setSingleUse(true);
? – Artem Bilan Commented Mar 14 at 14:20closeConnection()
is suspicious whentcpNioClientConnectionFactory.setSingleUse(true);
. – Artem Bilan Commented Mar 14 at 14:23setSingleUse(false)
concept. You really cannot have a new connection if you asked factory to be not single use. – Artem Bilan Commented Mar 14 at 14:27