Потоковая передача из удаленных каталогов и подкаталогов SFTP с помощью Spring Integration - PullRequest
0 голосов
/ 02 апреля 2020

Я использую Spring Integration Streaming Inbound Channel Adapter, чтобы получать поток от удаленного SFTP и анализировать все строки процесса обработки.

Я использую:

IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
                          .filter(remoteFileFilter)
                          .remoteDirectory("test_dir"),
                        e -> e.id("sftpInboundAdapter")
                              .autoStartup(true)
                              .poller(Pollers.fixedDelay(fetchInt)))
                .handle(Files.splitter(true, true))
....

И теперь он может работать , Но я могу получить файл только из каталога test_dir, но мне нужно рекурсивно получать файлы из этого каталога и подкаталога и анализировать каждую строку.

Я заметил, что Inbound Channel Adapter это Sftp.inboundAdapter(sftpSessionFactory).scanner(...). Он может сканировать подкаталог. Но я ничего не видел для Streaming Inbound Channel Adapter.

Итак, как мне реализовать «рекурсивное получение файлов из dir» в Streaming Inbound Channel Adapter?

Спасибо.

1 Ответ

1 голос
/ 02 апреля 2020

Вы можете использовать два исходящих шлюза - первый делает ls -R (рекурсивный список); разделите результат и используйте шлюз, настроенный на mget -stream, чтобы получить каждый файл.

РЕДАКТИРОВАТЬ

@SpringBootApplication
public class So60987851Application {

    public static void main(String[] args) {
        SpringApplication.run(So60987851Application.class, args);
    }

    @Bean
    IntegrationFlow flow(SessionFactory<LsEntry> csf) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
                .handle(Sftp.outboundGateway(csf, Command.LS, "payload")
                        .options(Option.RECURSIVE, Option.NAME_ONLY)
                        // need a more robust metadata store for persistence, unless the files are removed
                        .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                .split()
                .log()
                .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
                .handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
                        .options(Option.STREAM))
                .split(new FileSplitter())
                .log()
                // instead of a filter, we can remove the remote file.
                // but needs some logic to wait until all lines read
//              .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
//              .log()
                .get();
    }

    @Bean
    CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
        return new CachingSessionFactory<>(sf);
    }

    @Bean
    DefaultSftpSessionFactory sf() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.8");
        sf.setUser("gpr");
        sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
        sf.setAllowUnknownKeys(true);
        return sf;
    }

}
...