最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Ensuring Independent Connections for Threads in Spring Integration TCP - Stack Overflow

programmeradmin2浏览0评论

[Problem]

We are facing an issue where a connection created in one thread is being shared with another thread when a new request comes in. This happens even though each request should have its own independent connection.

[Key constraints]

  1. A connection is created per thread, but when another thread starts a request, the connection from the previous thread gets shared.
  2. Due to business requirements, we cannot set TcpNioClientConnectionFactory.setSingleUse(true).

[Tech Stack]

  • Spring Integration 5.1.0
  • Spring Boot 2.1.0
  • JDK 1.8

Below is our TcpClientConfig code:

@EnableIntegration
@Configuration
@RequiredArgsConstructor
@Slf4j
@ComponentScan
public class TcpClientConfig implements ApplicationEventPublisherAware {

    private final Properties properties;
    private static final long TIMEOUT = 20000L;
    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Bean
    public AbstractClientConnectionFactory clientConnectionFactory() {
        TcpNioClientConnectionFactory tcpNioClientConnectionFactory = new TcpNioClientConnectionFactory(properties.getIp(), properties.getPort());
        tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
        tcpNioClientConnectionFactory.setSingleUse(true);
        tcpNioClientConnectionFactory.setSerializer(new ByteArrayCrSerializer());
        tcpNioClientConnectionFactory.setDeserializer(new ByteArrayCrSerializer());
        tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
        return tcpNioClientConnectionFactory;
    }


    public void closeConnection() throws Exception {
        try {
            TcpConnectionSupport oldConnection = clientConnectionFactory().getConnection();
            if (oldConnection != null) {
                oldConnection.close();
            }
        } catch (Exception e){
            throw new Exception("error", e);
        }
    }

    @Bean
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "outboundChannel")
    public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
        tcpOutboundGateway.setRequestTimeout(TIMEOUT);
        tcpOutboundGateway.setRemoteTimeout(TIMEOUT);
        tcpOutboundGateway.setReplyChannel(inboundChannel());
        return tcpOutboundGateway;
    }


    @EventListener
    public void handleTcpConnectionEvent(TcpConnectionOpenEvent event) {
        log.info("============================== TCP Connection Opened : {} ==============================", event.getConnectionId());
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
        log.info("============================== TCP Connection Closed : {} ==============================", event.getConnectionId());
    }
}

And here is the calling code:

@Service
@Slf4j
@RequiredArgsConstructor
@ComponentScan
public class TcpMessageService {

    private final TcpClientConfig tcpClientConfig;


    public void sendMessage(byte[] tcpData) {
        Message<byte[]> message = MessageBuilder.withPayload(tcpData).build();
        tcpClientConfig.outboundChannel().send(message);
    }


    public void sendAck() {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            ByteArrayCrSerializer serializer = new ByteArrayCrSerializer();
            serializer.serialize(ACK.getBytes(StandardCharsets.UTF_8), byteArrayOutputStream);
            byte[] serializedAck = byteArrayOutputStream.toByteArray();

            Message<byte[]> ackMessage = MessageBuilder.withPayload(serializedAck).build();
            tcpClientConfig.outboundChannel().send(ackMessage);

        } catch (IOException e) {
            log.error("error", e);
        }
    }


    public Message<?> receiveMessage() {
        return tcpClientConfig.inboundChannel().receive(TIMEOUT);
    }
}

[Desired Solution]

We need a way to ensure that each thread creates and uses its own connection without sharing it with other threads. Every new request from a different thread should always establish a fresh connection, completely independent of any previous connections.

How can we achieve this while keeping setSingleUse(false)? Any suggestions or alternative approaches would be greatly appreciated!

[Problem]

We are facing an issue where a connection created in one thread is being shared with another thread when a new request comes in. This happens even though each request should have its own independent connection.

[Key constraints]

  1. A connection is created per thread, but when another thread starts a request, the connection from the previous thread gets shared.
  2. Due to business requirements, we cannot set TcpNioClientConnectionFactory.setSingleUse(true).

[Tech Stack]

  • Spring Integration 5.1.0
  • Spring Boot 2.1.0
  • JDK 1.8

Below is our TcpClientConfig code:

@EnableIntegration
@Configuration
@RequiredArgsConstructor
@Slf4j
@ComponentScan
public class TcpClientConfig implements ApplicationEventPublisherAware {

    private final Properties properties;
    private static final long TIMEOUT = 20000L;
    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Bean
    public AbstractClientConnectionFactory clientConnectionFactory() {
        TcpNioClientConnectionFactory tcpNioClientConnectionFactory = new TcpNioClientConnectionFactory(properties.getIp(), properties.getPort());
        tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
        tcpNioClientConnectionFactory.setSingleUse(true);
        tcpNioClientConnectionFactory.setSerializer(new ByteArrayCrSerializer());
        tcpNioClientConnectionFactory.setDeserializer(new ByteArrayCrSerializer());
        tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
        return tcpNioClientConnectionFactory;
    }


    public void closeConnection() throws Exception {
        try {
            TcpConnectionSupport oldConnection = clientConnectionFactory().getConnection();
            if (oldConnection != null) {
                oldConnection.close();
            }
        } catch (Exception e){
            throw new Exception("error", e);
        }
    }

    @Bean
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "outboundChannel")
    public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
        tcpOutboundGateway.setRequestTimeout(TIMEOUT);
        tcpOutboundGateway.setRemoteTimeout(TIMEOUT);
        tcpOutboundGateway.setReplyChannel(inboundChannel());
        return tcpOutboundGateway;
    }


    @EventListener
    public void handleTcpConnectionEvent(TcpConnectionOpenEvent event) {
        log.info("============================== TCP Connection Opened : {} ==============================", event.getConnectionId());
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
        log.info("============================== TCP Connection Closed : {} ==============================", event.getConnectionId());
    }
}

And here is the calling code:

@Service
@Slf4j
@RequiredArgsConstructor
@ComponentScan
public class TcpMessageService {

    private final TcpClientConfig tcpClientConfig;


    public void sendMessage(byte[] tcpData) {
        Message<byte[]> message = MessageBuilder.withPayload(tcpData).build();
        tcpClientConfig.outboundChannel().send(message);
    }


    public void sendAck() {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            ByteArrayCrSerializer serializer = new ByteArrayCrSerializer();
            serializer.serialize(ACK.getBytes(StandardCharsets.UTF_8), byteArrayOutputStream);
            byte[] serializedAck = byteArrayOutputStream.toByteArray();

            Message<byte[]> ackMessage = MessageBuilder.withPayload(serializedAck).build();
            tcpClientConfig.outboundChannel().send(ackMessage);

        } catch (IOException e) {
            log.error("error", e);
        }
    }


    public Message<?> receiveMessage() {
        return tcpClientConfig.inboundChannel().receive(TIMEOUT);
    }
}

[Desired Solution]

We need a way to ensure that each thread creates and uses its own connection without sharing it with other threads. Every new request from a different thread should always establish a fresh connection, completely independent of any previous connections.

How can we achieve this while keeping setSingleUse(false)? Any suggestions or alternative approaches would be greatly appreciated!

Share Improve this question asked Mar 14 at 9:15 aileenaileen 111 silver badge1 bronze badge 5
  • 1 Why do you say Due to business requirements, we cannot set , but then I see in the code tcpNioClientConnectionFactory.setSingleUse(true); ? – Artem Bilan Commented Mar 14 at 14:20
  • 1 That your closeConnection() is suspicious when tcpNioClientConnectionFactory.setSingleUse(true);. – Artem Bilan Commented Mar 14 at 14:23
  • 1 Your question contradicts with a setSingleUse(false) concept. You really cannot have a new connection if you asked factory to be not single use. – Artem Bilan Commented Mar 14 at 14:27
  • @ArtemBilan Thank you for your comment and for pointing that out! I pasted the wrong code; that was my mistake. I actually set 'setSingleUse(false)' instead of 'setSingleUse(true)'. Is there a way to ensure that each thread creates and uses its own connection without sharing it with other threads? – aileen Commented Mar 17 at 1:21
  • So please correct the code in your question. – user207421 Commented Mar 17 at 22:34
Add a comment  | 

1 Answer 1

Reset to default 1

The TcpNioClientConnectionFactory with setSingleUse(false) would create a single connection and let it to be reused between consumers of this factory.

Consider to use a CachingClientConnectionFactory wrapper around your TcpNioClientConnectionFactory . This way, even if with that setSingleUse(true) internally, the connection is not going to be closed immediately, but rather returned to the cache. Plus, the cache logic is that no one connection would be used in different threads at the same time.

There is also a ThreadAffinityClientConnectionFactory for your consideration.

See more info in docs:

https://docs.spring.io/spring-integration/reference/ip/tcp-connection-factories.html#caching-cf

https://docs.spring.io/spring-integration/reference/ip/tcp-connection-factories.html#tcp-affinity-cf

发布评论

评论列表(0)

  1. 暂无评论