Получить все файлы, которые соответствуют фильтру один раз - PullRequest
1 голос
/ 11 июля 2019

Я пытаюсь получить количество файлов с моим фильтром из моего потокового апдатера ftp, поэтому после обработки всех файлов я хочу запустить удаленную оболочку или есть какой-либо другой способ узнать, что адаптер завершил отправку сообщения

Я уже пытался с CompositeFileListFilter переопределить общедоступный метод List filterFiles (F [] files), но он никогда не вызывается.

на данный момент я использую фиксированное количество файлов, но оно должно быть динамическим.

Я сделал переопределение этого метода в CompositeFileListFilter

@Override
    public List<F> filterFiles(F[] files) {
        log.info("received {} files", files.length);
        return super.filterFiles(files);
    }

У меня есть следующий поток интеграции, использующий атомный счетчик до 3, он должен быть 3.:

AtomicInteger messageCounter = new AtomicInteger(0);
        return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
                .remoteDirectory("/inputFolder")
                .filter(new CompositeFileListFilterWithCount<>() {{
                    addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
                }})
            , pollerConfiguration)
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
            .handle(message -> {
                int numericValue = messageCounter.incrementAndGet();
                log.info("numeric value: {}", numericValue);
                if (numericValue == 3) {
                    messageCounter.set(0);
                    log.info("launch remote shell here now"));                 
                }
            }, e -> e.advice(after()))
            .get();

если я не использую счетчик, я получу удаленный вызов оболочки для каждого файла, и мне нужно, чтобы он вызывался только один раз, только после завершения потока, это запланировано на основе cronjob, поэтому я хочу вызвать это только один раз в конце.

Я использую фиксированную задержку в 1 с для теста, но он будет запускаться только три раза в день, мне нужно выбирать все время на всех часах.

это мой poller Конфигурация для теста:

sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))

UPDATE

Я попробовал то, что было предложено Артемом, но у меня странное поведение, я пытаюсь получить все файлы в определенной папке ftp в одном опросе, поэтому читаю документы:

если для параметра max-messages-per-poll установлено значение 1 (по умолчанию), он обрабатывает только один файл за раз с интервалами, определенными вашим триггером, по сути, работая как «one-poll === one-file». ».

В типичных случаях использования передачи файлов вам, скорее всего, понадобится противоположное поведение: обрабатывать все файлы, которые вы можете для каждого опроса, и только потом ждать следующего опроса. Если это так, задайте для max-messages-per-poll значение -1. Затем при каждом опросе адаптер пытается создать столько сообщений, сколько возможно ...

поэтому я установил для max-message-per-poll значение -1, поэтому каждый опрос дает мне каждый файл. Я добавил фильтр в только для файлов .xml и для предотвращения дублирования, acceptOnceFilter, но потоковый адаптер ftp предоставляет мне неограниченное количество раз тех же файлов, что не имеет смысла, я использовал для этого теста Исправлена ​​задержка 10 с.

2019-07-23 10:32:04.308  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process2 file sample1.xml
2019-07-23 10:32:04.312  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.315  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.326  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.330  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.333  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
2019-07-23 10:32:04.337  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.341  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample1.xml
2019-07-23 10:32:04.345  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.347  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.353  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.357  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.358  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
...............................
return IntegrationFlows
            .from(Ftp.inboundStreamingAdapter(testFlowTemplate())
                    .remoteDirectory("/inputTestFlow")
                    .filter(new CompositeFileListFilter<>() {{
                        addFilter(new AcceptOnceFileListFilter<>());
                        addFilter(new FtpSimplePatternFileListFilter("*.xml"));
                    }})
                , sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> {
                execution.setStartDate(new Date());
                return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
            })
            .handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
                    .useTemporaryFileName(false)
                    .fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
                    .remoteDirectory("/output/")
                , e -> e.advice(testFlowAfter())
            )
            .get();

Обновление 2

Я достиг того, что мне нужно, создав этот пользовательский фильтр:

.filter(new FileListFilter<>() {
                        private final Set<String> seenSet = new HashSet<>();
                        private Date lastExecution;

                        @Override
                        public List<FTPFile> filterFiles(FTPFile[] files) {
                            return Arrays.stream(files).filter(ftpFile -> {
                                if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
                                    this.seenSet.clear();
                                }
                                lastExecution = new Date();
                                if (ftpFile.getName().endsWith(".xml")) {
                                    return this.seenSet.add(ftpFile.getRawListing());
                                }
                                return false;
                            }).collect(Collectors.toList());
                        }
                    })

но я использовал ручной 10-секундный интервал, который подходит для моих нужд, какой-нибудь другой умный способ улучшить этот код в зависимости от триггера?

1 Ответ

1 голос
/ 11 июля 2019

Я думаю, что триггер cron не является правильным решением, поскольку вы действительно хотели бы иметь единый процесс для всех извлеченных файлов.

Я думаю, что ваша логика в filterFiles() неверна.Вы действительно хотели бы установить счетчик на количество файлов, которые он собирается обработать, но не на исходную сумму:

@Override
public List<F> filterFiles(F[] files) {
    List<F> filteredFiles = super.filterFiles(files);
    log.info("received {} files", filteredFiles.size());
    return filteredFiles;
}

, и здесь вы действительно можете установить значение в messageCounter.

ОБНОВЛЕНИЕ

В фильтре есть эта функция:

/**
 * Indicates that this filter supports filtering a single file.
 * Filters that return true <b>must</b> override {@link #accept(Object)}.
 * Default false.
 * @return true to allow external calls to {@link #accept(Object)}.
 * @since 5.2
 * @see #accept(Object)
 */
default boolean supportsSingleFileFiltering() {
    return false;
}

Я думаю, что когда вы переопределяете ее в явном false в вашем CompositeFileListFilterWithCount, ты должен быть хорошим.В противном случае вы действительно правы: по умолчанию для каждого файла вызывается только простой accept().Просто потому, что все ваши FtpSimplePatternFileListFilter поставляются с true по умолчанию, и все они являются вкладом в true на уровне FtpSimplePatternFileListFilter.

Тем не менее все это говорит нам о том, что вы уже используете Spring Integration 5.2: -) ...

ОБНОВЛЕНИЕ 2

Попробуйте ChainFileListFilter не стоитПоместите AcceptOnceFileListFilter в конце цепочки.Хотя может быть лучше использовать FtpPersistentAcceptOnceFileListFilter вместо этого: он учитывает lastmodified для файла.Также рассмотрим включение в цепочку некоторого варианта LastModifiedFileListFilter для FTPFile.Нечто подобное у вас есть в вашем, но в виде отдельного фильтра.

Не уверен, однако, что вы имеете в виду, чтобы сделать это на основе триггера.Просто нет никакой связи между фильтром и триггером.Конечно, вы можете иметь какое-то общее свойство interval и настроить его в соответствии с последним измененным значением фильтра.

Кстати: эта ваша история ушла далеко от первоначального сразу запрос.Адаптер входящего канала на самом деле составляет один файл на сообщение , поэтому вы определенно не можете иметь список файлов в одном сообщении, как это возможно с FtpOutboundGateway и его LS или MGET команды, как я упоминал в комментариях ниже.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...