Построить SFTPOutboundGateway во время выполнения на основе значений в заголовке и полезной нагрузке - PullRequest
0 голосов
/ 22 мая 2019

Я пытаюсь создать IntegrationFlow, где я читаю список файлов из удаленного каталога SFTP и загружаю его один за другим.У меня есть несколько каталогов для опроса, и каждый каталог имеет свою собственную фабрику сессий в заголовке.Имя отдельного файла происходит из полезной нагрузки первого IntegrationFlow, который получает для меня список файлов.

Проблема в том, что я не могу отправить заголовки и полезную нагрузку методу для создания OutboundGateway.

Вот код того, чего я пытаюсь достичь:

private IntegrationFlow sftpDownloadFlow(RegionalSFTConfig regionalSFTConfig, SessionFactory regionalSftpSessionFactory, String region) {
    return IntegrationFlows.
            from(ftpSource(regionalSFTConfig),
                    configure -> configure.poller(Pollers.fixedRate(regionalSFTConfig.getPollInterval())))
            .enrichHeaders((headers) -> {
                headers.header("sftSessionFactory",regionalSftpSessionFactory);
                headers.header("sftConfig",regionalSFTConfig);

            })
            .handle(getRemoteFileList(regionalSFTConfig, regionalSftpSessionFactory))
            .filter(Objects::nonNull)
            .split(FileInfo.class, FileInfo::getFilename)
            .channel("sftp.download")
            .get();

}

@Bean
public IntegrationFlow sftpRegion1Flow() {
    return sftpDownloadFlow(region1Config, region1SessionFactory);
}
@Bean
public IntegrationFlow sftpRegion2Flow() {
    return sftpDownloadFlow(region2Config, region2SessionFactory);
}

private AbstractRemoteFileOutboundGateway<ChannelSftp.LsEntry> getRemoteFileOutboundGateway(MessageHeaders headers, String fileName) {
    AbstractRemoteFileOutboundGateway<ChannelSftp.LsEntry> fileOutboundGateway =
            Sftp.outboundGateway((SessionFactory)headers.get("sftSessionFactory"),
                    AbstractRemoteFileOutboundGateway.Command.GET,
                    new SpelExpressionParser().parseExpression( "'" + ((String)headers.get("file_remoteDirectory")).concat(fileName) + "'").getExpressionString())
                    //.localDirectory(new File(regionalSFTConfig.getLocalDirectory()))
                    .options(AbstractRemoteFileOutboundGateway.Option.STREAM, AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
                    .get();
    fileOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE);
    return fileOutboundGateway;

}

@Bean
public IntegrationFlow DownloadAndProcessFlow() {
    return IntegrationFlows.
            from("sftp.download")
            .handle(getRemoteFileOutboundGateway(headers, payload))
            .handle(someFileProcessor)
            .get();
}

Здесь, в DownloadAndProcessFlow, я хочу иметь возможность отправлять заголовки и полезную нагрузку методу getRemoteFileOutboundGateway во время выполнения, чтобы создать шлюз на основе SFTConfig и SessionFactory в заголовках.Однако я не могу получить информацию о заголовках и полезной нагрузке на этом этапе.Есть указатели?

...