I'm trying to upload files to multiple SFTP server locations based on the request. I read the sftp connection details from database and create a session factory, that is stored in a map of session factories based on a key. Now, when I try to create messagehandler for the same, I need to create a dynamic message handler with session factory created and stored in the map. I'm not sure how to dynamically use that map and messagehandler. Couldn't find enough source. I have given my code below. I would like to know which is the best approach for this. I did see spring integration flows, but I couldn't get how to use that dynamically. Can someone please help on this?
Message builder
public class SFTPService {
private final MessageChannel uploadEmployerSftpChannel;
@Retryable(
retryFor = { MessagingException.class, IOException.class },
backoff = @Backoff(delay = 5000)
)
public void uploadFromSftpConnection(@NonNull final SftpConnection sftpConnection,
@NonNull final Resource file,
@NonNull final String groupId) throws IOException {
if (UploadEnabled) {
log.info("Uploading file {} to server via InputStream with sftpRemoteDir header: {}", file.getFilename(), sftpConnection.getRemoteDirectory());
final Message<InputStream> message = MessageBuilder.withPayload(file.getInputStream())
.setHeader(SFTP_REMOTE_DIR, sftpConnection.getRemoteDirectory())
.setHeader(FILE_NAME, file.getFilename())
.setHeader(GROUP_ID, groupId)
.build();
uploadEmployerSftpChannel.send(message);
log.info("File successfully uploaded to client server path: {}", sftpConnection.getRemoteDirectory());
} else {
log.info("upload is disabled. Skipping upload of file {} to client server.", file.getFilename());
}
}
}
SFTP Message handler
@RequiredArgsConstructor
@Configuration
public class SftpRuntimeSessionFactoryLocator {
private final Map<Object, SessionFactory<SftpClient.DirEntry>> sftpSessionFactoryMap = new HashMap<>();
private final SftpConnectionService sftpConnectionService;
public SessionFactory getSessionFactory(final Object groupId) {
if (!sftpSessionFactoryMap.containsKey(groupId)) {
sftpSessionFactoryMap.put(groupId, generateSessionFactory(groupId.toString()));
}
return sftpSessionFactoryMap.get(groupId);
}
public SessionFactory<SftpClient.DirEntry> generateSessionFactory(final String key) {
//get sftp connection details from database
final SftpConnection sftpConnection = getSftpConnection(key);
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpConnection.getHost());
factory.setPort(sftpConnection.getPort());
factory.setPassword(sftpConnection.getPassword());
factory.setUser(sftpConnection.getUser());
factory.setAllowUnknownKeys(sftpConnection.isAllowUnknownKeys());
return new CachingSessionFactory<>(factory);
}
private SftpConnection getSftpConnection(final String groupId) {
return sftpConnectionService.getSFtpConnectionDetailsByGroupId(groupId);
}
@Bean
public MessageChannel uploadEmployerSftpChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "uploadEmployerSftpChannel")
public MessageHandler dynamicSftpMessageHandler() {
return message -> {
final SessionFactory sftpSessionFactory = getSessionFactory(message.getHeaders().get("groupId"));
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory);
handler.setUseTemporaryFileName(false);
handler.setRemoteDirectoryExpression(new FunctionExpression<Message<?>>(msg
-> msg.getHeaders().get("sftpRemoteDir", String.class)
));
handler.setFileNameGenerator(msg -> {
if (msg.getHeaders().containsKey("filename")) {
return msg.getHeaders().get("filename", String.class);
}
if (msg.getPayload() instanceof File) {
return ((File) msg.getPayload()).getName();
}
throw new IllegalArgumentException("filename not provided for SFTP Message");
});
};
}
}
When I try to connect, I get error saying
Caused by: .springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [.springframework.integration.handler.MethodInvokingMessageProcessor@1b030042] (sftpRuntimeSessionFactoryLocator.dynamicSftpMessageHandler.serviceActivator)], failedMessage=GenericMessage [payload=sun.nio.ch.ChannelInputStream@378c9b97, headers={sftpRemoteDir=/target/outbound, filename=testfile.txt, id=aa6242a1-fce3-6ad4-c239-04131046f456, groupId=789, timestamp=1742477124410}]
at .springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at .springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at .springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at .springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at .springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at .springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at .springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at .springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at .springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390)
at .springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:361)
at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:331)
at .springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304)
at company.notification.service.SftpService.uploadFromSftpConnection(SftpService.java:104)
... 158 more
Caused by: .springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available