Просматривайте удаленный каталог для добавленных файлов и передавайте его для чтения данных через SFTP - PullRequest
0 голосов
/ 30 апреля 2019

Я хочу добавить часы на удаленную машину для новых добавленных файлов CSV или непрочитанных. Как только файлы идентифицированы, прочитайте их в соответствии с их временной отметкой, которая будет указана в имени файла. Файл будет прочитан с использованием потоковой передачи, а скорее копированием на локальную машину. Во время чтения файла добавьте _reading к имени файла и добавьте _read после прочтения файла. Файл будет прочитан по протоколу SFTP, и я планирую использовать sftp для интеграции с пружиной. В случае ошибки при чтении файла или данных в файле не так, как ожидалось, я хочу переместить этот файл в подкаталог.

Я попытался опросить удаленный каталог и прочитать один раз файл CSV. После прочтения я удаляю файл из каталога.

 <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>

Spring boot version 2.0.3.RELEASE   
 @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(hostname);
        factory.setPort(22);
        factory.setUser(username);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<ChannelSftp.LsEntry>(factory);
    }

    @Bean
    public MessageSource<InputStream> sftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory(path);
        messageSource.setFilter(compositeFilters());
        return messageSource;
    }

    public CompositeFileListFilter compositeFilters() {
        return new CompositeFileListFilter()
                .addFilter(new SftpRegexPatternFileListFilter(".*csv"));
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @Bean
    public IntegrationFlow sftpOutboundListFlow() {
        return IntegrationFlows.from(this.sftpMessageSource(), e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(Sftp.outboundGateway(template(), NLST, path).options(Option.RECURSIVE)))
                .filter(compositeFilters())
                .transform(sorter())
                .split()
                .handle(Sftp.outboundGateway(template(), GET, "headers['file_remoteDirectory'] + headers['file_remoteFile']").options(STREAM))
                .transform(csvToPojoTransformer())
                .handle(service())
                .handle(Sftp.outboundGateway(template(), MV, "headers['file_remoteDirectory'] + headers['file_remoteFile'] + _read"))
                .handle(after())
                .get();
    }

    @Bean
    public MessageHandler sorter() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<String> fileNames = (List<String>) message.getPayload();
                Collections.sort(fileNames);
            }
        };
    }

    @Bean
    public MessageHandler csvToPojoTransformer() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                InputStream streamData = (InputStream) message.getPayload();
                convertStreamtoObject(streamData, Class.class);
            }
        };
    }

    public List<?> convertStreamtoObject(InputStream inputStream, Class clazz) {
        HeaderColumnNameMappingStrategy ms = new HeaderColumnNameMappingStrategy();
        ms.setType(clazz);
        Reader reader = new InputStreamReader(inputStream);

        CsvToBean cb = new CsvToBeanBuilder(reader)
                .withType(clazz)
                .withMappingStrategy(ms)
                .withSkipLines(0)
                .withSeparator('|')
                .withThrowExceptions(true)
                .build();
        return cb.parse();
    }

    @Bean
    public MessageHandler service() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<Class> csvDataAsListOfPojo = List < Class > message.getPayload();
                // use this
            }
        };
    }
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

Обновленный код

1 Ответ

0 голосов
/ 30 апреля 2019

Для сложных сценариев (список, перемещение, выборка, удаление и т. Д.) Следует использовать удаленные файловые шлюзы SFTP * .

Исходящий шлюз SFTP предоставляет ограниченный наборкоманд, которые позволяют вам взаимодействовать с удаленным SFTP-сервером:

ls (список файлов)

nlst (список имен файлов)

get (получить файл)

mget (получить несколько файлов)

rm (удалить файлы)

mv (переместить и переименовать файл)

поставить (отправить файл)

mput (отправка нескольких файлов)

Или используйте SftpRemoteFileTemplate прямо из вашего кода.

EDIT

Inответ на ваши комментарии;вам нужно что-то вроде этого

  • Адаптер входящего канала (с поллером) - возвращает имя каталога
  • LS Gateway
  • Фильтр (удаляет все файлы, которые уже получены)
  • Transformer (сортировка списка)
  • Splitter
  • GET Gateway (опция потока)
  • Transformer (csv to POJO)
  • Сервис (процесс POJO)

Если вы добавите

  • RM Gateway

после обслуживания (для удаления удаленного файла), вам не нужношаг фильтра.

Возможно, Java DSL будет проще собрать этот поток ...

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(() -> "some/dir", e -> e.poller(...))
        .handle(...) // LS Gateway
        .filter(...)
        .transform(sorter())
        .split
        .handle(...) // GET Gateway
        .transform(csvToPojoTransformer())
        .handle(myService())
        .get()
}
...