Я пытаюсь создать IntegrationFlow, где я читаю список файлов из удаленного каталога SFTP и загружаю его один за другим.У меня есть несколько каталогов для опроса, и каждый каталог имеет свою собственную фабрику сессий в заголовке.Имя отдельного файла происходит из полезной нагрузки первого IntegrationFlow, который получает для меня список файлов.
Проблема в том, что я не могу отправить заголовки и полезную нагрузку методу для создания OutboundGateway.
Вот код того, чего я пытаюсь достичь:
private IntegrationFlow sftpDownloadFlow(RegionalSFTConfig regionalSFTConfig, SessionFactory regionalSftpSessionFactory, String region) {
return IntegrationFlows.
from(ftpSource(regionalSFTConfig),
configure -> configure.poller(Pollers.fixedRate(regionalSFTConfig.getPollInterval())))
.enrichHeaders((headers) -> {
headers.header("sftSessionFactory",regionalSftpSessionFactory);
headers.header("sftConfig",regionalSFTConfig);
})
.handle(getRemoteFileList(regionalSFTConfig, regionalSftpSessionFactory))
.filter(Objects::nonNull)
.split(FileInfo.class, FileInfo::getFilename)
.channel("sftp.download")
.get();
}
@Bean
public IntegrationFlow sftpRegion1Flow() {
return sftpDownloadFlow(region1Config, region1SessionFactory);
}
@Bean
public IntegrationFlow sftpRegion2Flow() {
return sftpDownloadFlow(region2Config, region2SessionFactory);
}
private AbstractRemoteFileOutboundGateway<ChannelSftp.LsEntry> getRemoteFileOutboundGateway(MessageHeaders headers, String fileName) {
AbstractRemoteFileOutboundGateway<ChannelSftp.LsEntry> fileOutboundGateway =
Sftp.outboundGateway((SessionFactory)headers.get("sftSessionFactory"),
AbstractRemoteFileOutboundGateway.Command.GET,
new SpelExpressionParser().parseExpression( "'" + ((String)headers.get("file_remoteDirectory")).concat(fileName) + "'").getExpressionString())
//.localDirectory(new File(regionalSFTConfig.getLocalDirectory()))
.options(AbstractRemoteFileOutboundGateway.Option.STREAM, AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
.get();
fileOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE);
return fileOutboundGateway;
}
@Bean
public IntegrationFlow DownloadAndProcessFlow() {
return IntegrationFlows.
from("sftp.download")
.handle(getRemoteFileOutboundGateway(headers, payload))
.handle(someFileProcessor)
.get();
}
Здесь, в DownloadAndProcessFlow, я хочу иметь возможность отправлять заголовки и полезную нагрузку методу getRemoteFileOutboundGateway во время выполнения, чтобы создать шлюз на основе SFTConfig и SessionFactory в заголовках.Однако я не могу получить информацию о заголовках и полезной нагрузке на этом этапе.Есть указатели?