Я пытаюсь получить количество файлов с моим фильтром из моего потокового апдатера ftp, поэтому после обработки всех файлов я хочу запустить удаленную оболочку или есть какой-либо другой способ узнать, что адаптер завершил отправку сообщения
Я уже пытался с CompositeFileListFilter переопределить общедоступный метод List filterFiles (F [] files), но он никогда не вызывается.
на данный момент я использую фиксированное количество файлов, но оно должно быть динамическим.
Я сделал переопределение этого метода в CompositeFileListFilter
@Override
public List<F> filterFiles(F[] files) {
log.info("received {} files", files.length);
return super.filterFiles(files);
}
У меня есть следующий поток интеграции, использующий атомный счетчик до 3, он должен быть 3.:
AtomicInteger messageCounter = new AtomicInteger(0);
return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
.remoteDirectory("/inputFolder")
.filter(new CompositeFileListFilterWithCount<>() {{
addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
}})
, pollerConfiguration)
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
.handle(message -> {
int numericValue = messageCounter.incrementAndGet();
log.info("numeric value: {}", numericValue);
if (numericValue == 3) {
messageCounter.set(0);
log.info("launch remote shell here now"));
}
}, e -> e.advice(after()))
.get();
если я не использую счетчик, я получу удаленный вызов оболочки для каждого файла, и мне нужно, чтобы он вызывался только один раз, только после завершения потока, это запланировано на основе cronjob, поэтому я хочу вызвать это только один раз в конце.
Я использую фиксированную задержку в 1 с для теста, но он будет запускаться только три раза в день, мне нужно выбирать все время на всех часах.
это мой poller Конфигурация для теста:
sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))
UPDATE
Я попробовал то, что было предложено Артемом, но у меня странное поведение, я пытаюсь получить все файлы в определенной папке ftp в одном опросе, поэтому читаю документы:
если для параметра max-messages-per-poll установлено значение 1 (по умолчанию), он обрабатывает только один файл за раз с интервалами, определенными вашим триггером, по сути, работая как «one-poll === one-file». ».
В типичных случаях использования передачи файлов вам, скорее всего, понадобится противоположное поведение: обрабатывать все файлы, которые вы можете для каждого опроса, и только потом ждать следующего опроса. Если это так, задайте для max-messages-per-poll значение -1. Затем при каждом опросе адаптер пытается создать столько сообщений, сколько возможно ...
поэтому я установил для max-message-per-poll значение -1, поэтому каждый опрос дает мне каждый файл.
Я добавил фильтр в только для файлов .xml и для предотвращения дублирования, acceptOnceFilter, но потоковый адаптер ftp предоставляет мне неограниченное количество раз тех же файлов, что не имеет смысла, я использовал для этого теста Исправлена задержка 10 с.
2019-07-23 10:32:04.308 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process2 file sample1.xml
2019-07-23 10:32:04.312 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.315 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.326 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.330 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.333 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
2019-07-23 10:32:04.337 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.341 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample1.xml
2019-07-23 10:32:04.345 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.347 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.353 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.357 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.358 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
...............................
return IntegrationFlows
.from(Ftp.inboundStreamingAdapter(testFlowTemplate())
.remoteDirectory("/inputTestFlow")
.filter(new CompositeFileListFilter<>() {{
addFilter(new AcceptOnceFileListFilter<>());
addFilter(new FtpSimplePatternFileListFilter("*.xml"));
}})
, sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> {
execution.setStartDate(new Date());
return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
})
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
.remoteDirectory("/output/")
, e -> e.advice(testFlowAfter())
)
.get();
Обновление 2
Я достиг того, что мне нужно, создав этот пользовательский фильтр:
.filter(new FileListFilter<>() {
private final Set<String> seenSet = new HashSet<>();
private Date lastExecution;
@Override
public List<FTPFile> filterFiles(FTPFile[] files) {
return Arrays.stream(files).filter(ftpFile -> {
if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
this.seenSet.clear();
}
lastExecution = new Date();
if (ftpFile.getName().endsWith(".xml")) {
return this.seenSet.add(ftpFile.getRawListing());
}
return false;
}).collect(Collectors.toList());
}
})
но я использовал ручной 10-секундный интервал, который подходит для моих нужд, какой-нибудь другой умный способ улучшить этот код в зависимости от триггера?