Spring IntegrationFlow CompositeFileListFilter не работает - PullRequest
0 голосов
/ 27 января 2020

У меня есть два фильтра regexFilter и lastModified.

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .regexFilter(config.getRegexFilter())
            .filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {

    })))

При поиске в Google я понимаю, что должен использовать CompositeFileListFilter для регулярных выражений, поэтому измените мой код на

.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))

Его компилируется, но во время выполнения выдает ошибку, и канал останавливается, и та же самая ошибка идет для

.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.

public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
        CompositeFileListFilter filters = new CompositeFileListFilter();
            filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
        return filters;
    }

Я просто хочу фильтровать по имени файла. Существует 2 потока для одной и той же удаленной папки, и оба опрашивают один и тот же cron, но должны выбрать соответствующий файл.

EDIT добавление последнего LastModifiedLsEntryFileListFilter. Работает нормально, но добавляется по запросу.

public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {

private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;

private volatile long age = DEFAULT_AGE;

private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();


public long getAge() {
    return this.age;
}

public void setAge(long age) {
    setAge(age, TimeUnit.SECONDS);
}

public void setAge(long age, TimeUnit unit) {
    this.age = unit.toSeconds(age);
}

@Override
public List<LsEntry> filterFiles(LsEntry[] files) {

    List<LsEntry> list = new ArrayList<LsEntry>();

    long now = System.currentTimeMillis() / 1000;

    for (LsEntry file : files) {

        if (file.getAttrs()
                .isDir()) {
            continue;
        }
        String fileName = file.getFilename();
        Long currentSize = file.getAttrs().getSize();
        Long oldSize = sizeMap.get(fileName);

        if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
            // putting size in map, will verify in next iteration of scheduler
            sizeMap.put(fileName, currentSize);
            log.info("[{}] old size [{}]  increased to [{}]...", file.getFilename(), oldSize, currentSize);
            continue;
        }

        int lastModifiedTime = file.getAttrs()
            .getMTime();

        if (lastModifiedTime + this.age <= now ) {
            list.add(file);
            sizeMap.remove(fileName);
        } else {
            log.info("File [{}] is still being uploaded...", file.getFilename());
        }
    }
    return list;
}

}

PS: когда я тестировал фильтр для регулярных выражений, я удалил LastModifiedLsEntryFileListFilter просто для простоты. Итак, мой последний поток -

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
            //.filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory()),
            e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                try {

                    this.destroy(String.valueOf(config.getId()));


    configurationService.removeConfigurationChannelById(config.getId());

//                // logging here
                } catch (Exception ex1) {
            }
            }))).publishSubscribeChannel(s -> s
            .subscribe(f -> {

                f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

            })
            .subscribe(f -> {
                if (doArchive) {
                    f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                } else {
                    f.handle(m -> {
                    });
                }

            })
            .subscribe(f -> f
            .handle(m -> {

                // I am handling exception here
            })
            ))
            .get();

и вот исключения

2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel 

'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1

EDIT После передачи регулярного выражения в LastModifiedLsEntryFileListFilter и дескриптор там работает для меня. Когда я использую любой другой RegexFilter внутри CompositeFileListFilter, возникает ошибка.

.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))

1 Ответ

1 голос
/ 27 января 2020

Покажите, пожалуйста, ваш последний поток. Я не вижу, чтобы вы использовали LastModifiedLsEntryFileListFilter в вашем CompositeFileListFilter ... Вы определенно не можете использовать regexFilter() и filter() вместе - последний выигрывает. Чтобы избежать путаницы, мы предлагаем использовать filter() и составить все те, которые имеют CompositeFileListFilter или ChainFileListFilter.

Также, о какой ошибке вы упоминаете, пожалуйста.

...