Приложение источника файла Spring Cloud Stream - История обработанных файлов и файлов опроса в подкаталоге - PullRequest
1 голос
/ 09 апреля 2019

Я строю конвейер данных с приложением Spring Cloud Stream File Source в начале конвейера. Мне нужна помощь в работе над отсутствующими функциями

Исходное приложение «Мой файл» (на основе org.springframework.cloud.stream.app:spring-cloud-starter-stream-source-file) прекрасно работает, за исключением отсутствующих функций, с которыми мне нужна помощь. Мне нужно

  1. Для удаления файлов после опроса и обмена сообщениями
  2. Опрос в подкаталоги

Что касается пункта 1, я прочитал, что функция удаления не существует в приложении источника файла (она доступна в источнике sftp). Каждый раз, когда приложение перезапускается, файлы, которые были обработаны в прошлом, будут переизбираться, может ли история обработанных файлов стать постоянной? Есть ли легкая альтернатива?

Ответы [ 2 ]

0 голосов
/ 18 апреля 2019
@EnableBinding(Source.class)
@Import(TriggerConfiguration.class)
@EnableConfigurationProperties({FileSourceProperties.class, FileConsumerProperties.class,
    TriggerPropertiesMaxMessagesDefaultUnlimited.class})
public class FileSourceConfiguration {


@Autowired
@Qualifier("defaultPoller")
PollerMetadata defaultPoller;
@Autowired
Source source;
@Autowired
private FileSourceProperties properties;
@Autowired
private FileConsumerProperties fileConsumerProperties;

private Boolean alwaysAcceptDirectories = false;
private Boolean deletePostSend;
private Boolean movePostSend;
private String movePostSendSuffix;

@Bean
public IntegrationFlow fileSourceFlow() {
    FileInboundChannelAdapterSpec messageSourceSpec = Files.inboundAdapter(new File(this.properties.getDirectory()));

    RecursiveDirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();
    messageSourceSpec.scanner(recursiveDirectoryScanner);
    FileVisitOption[] fileVisitOption = new FileVisitOption[1];
    recursiveDirectoryScanner.setFilter(initializeFileListFilter());

    initializePostSendAction();

    IntegrationFlowBuilder flowBuilder = IntegrationFlows
            .from(messageSourceSpec,
                    new Consumer<SourcePollingChannelAdapterSpec>() {

                        @Override
                        public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
                            sourcePollingChannelAdapterSpec
                                    .poller(defaultPoller);
                        }

                    });

    ChannelInterceptor channelInterceptor = new ChannelInterceptor() {
        @Override
        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
            if (sent) {
                File fileOriginalFile = (File) message.getHeaders().get("file_originalFile");
                if (fileOriginalFile != null) {
                    if (movePostSend) {
                        fileOriginalFile.renameTo(new File(fileOriginalFile + movePostSendSuffix));
                    } else if (deletePostSend) {
                        fileOriginalFile.delete();
                    }
                }
            }
        }

        //Override more interceptor methods to capture some logs here
    };
    MessageChannel messageChannel = source.output();
    ((DirectChannel) messageChannel).addInterceptor(channelInterceptor);
    return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties)
            .channel(messageChannel)
            .get();
}

private void initializePostSendAction() {
    deletePostSend = this.properties.isDeletePostSend();
    movePostSend = this.properties.isMovePostSend();
    movePostSendSuffix = this.properties.getMovePostSendSuffix();

    if (deletePostSend && movePostSend) {
        String errorMessage = "The 'delete-file-post-send' and 'move-file-post-send' attributes are mutually exclusive";
        throw new IllegalArgumentException(errorMessage);
    }

    if (movePostSend && (movePostSendSuffix == null || movePostSendSuffix.trim().length() == 0)) {
        String errorMessage = "The 'move-post-send-suffix' is required when 'move-file-post-send' is set to true.";
        throw new IllegalArgumentException(errorMessage);
    }

    //Add additional validation to ensure the user didn't configure a file move that will result in cyclic processing of file
}

private FileListFilter<File> initializeFileListFilter() {

    final List<FileListFilter<File>> filtersNeeded = new ArrayList<FileListFilter<File>>();

    if (this.properties.getFilenamePattern() != null && this.properties.getFilenameRegex() != null) {
        String errorMessage = "The 'filename-pattern' and 'filename-regex' attributes are mutually exclusive.";
        throw new IllegalArgumentException(errorMessage);
    }

    if (StringUtils.hasText(this.properties.getFilenamePattern())) {
        SimplePatternFileListFilter patternFilter = new SimplePatternFileListFilter(this.properties.getFilenamePattern());
        if (this.alwaysAcceptDirectories != null) {
            patternFilter.setAlwaysAcceptDirectories(this.alwaysAcceptDirectories);
        }
        filtersNeeded.add(patternFilter);
    } else if (this.properties.getFilenameRegex() != null) {
        RegexPatternFileListFilter regexFilter = new RegexPatternFileListFilter(this.properties.getFilenameRegex());
        if (this.alwaysAcceptDirectories != null) {
            regexFilter.setAlwaysAcceptDirectories(this.alwaysAcceptDirectories);
        }
        filtersNeeded.add(regexFilter);
    }

    FileListFilter<File> createdFilter = null;

    if (!Boolean.FALSE.equals(this.properties.isIgnoreHiddenFiles())) {
        filtersNeeded.add(new IgnoreHiddenFileListFilter());
    }

    if (Boolean.TRUE.equals(this.properties.isPreventDuplicates())) {
        filtersNeeded.add(new AcceptOnceFileListFilter<File>());
    }

    if (filtersNeeded.size() == 1) {
        createdFilter = filtersNeeded.get(0);
    } else {
        createdFilter = new CompositeFileListFilter<File>(filtersNeeded);
    }

    return createdFilter;
}

}

0 голосов
/ 09 апреля 2019

Для поддержки этих требований вам определенно необходимо изменить код упомянутого проекта File Source: https://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.BUILD-SNAPSHOT/reference/htmlsingle/#_patching_pre_built_applications

Я бы предложил раскошелиться на проект и опросить его из GitHub как есть, так как вы собираетесьизменить существующий код проекта.Затем следуйте инструкциям в упомянутом документе, как создать целевой артефакт связующего, который будет совместим со средой SCDF.

Теперь о вопросах:

Для опроса подкаталогов для того жешаблон файла, вам нужно настроить RecursiveDirectoryScanner на Files.inboundAdapter():

/**
 * Specify a custom scanner.
 * @param scanner the scanner.
 * @return the spec.
 * @see FileReadingMessageSource#setScanner(DirectoryScanner)
 */
public FileInboundChannelAdapterSpec scanner(DirectoryScanner scanner) {

Обратите внимание, что все filters должны быть настроены на этом DirectoryScanner.В противном случае будет выдано предупреждение:

    // Check that the filter and locker options are _NOT_ set if an external scanner has been set.
    // The external scanner is responsible for the filter and locker options in that case.
    Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
            () -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
                    "Instead, set these options on the external DirectoryScanner: " + this.scanner);

Чтобы отслеживать файлы, лучше использовать FileSystemPersistentAcceptOnceFileListFilter, основанный на внешнем хранилище постоянства для реализации ConcurrentMetadataStore: https://docs.spring.io/spring-integration/reference/html/#metadata-store. Это необходимо использовать вместо этого preventDuplicates(), потому что FileSystemPersistentAcceptOnceFileListFilter обеспечивает только один раз логика для нас.

Удаление файла после отправки может не иметь значения, поскольку вы можете просто отправить File как есть, и он должен быть доступен на другой стороне.

Кроме того, вы можете добавить ChannelInterceptor в source.output() и реализовать его postSend() ввыполнить ((File) message.getPayload()).delete(), что произойдет, когда сообщение будет успешно отправлено в место назначения подшивки.

...