У меня есть файл настройки с программой-исполнителем
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
Как видно, у меня есть фиксированная установка threadpool
из 10 и максимальное сообщение 10 на опрос.Если я поставлю 10 файлов, он все еще обрабатывает один за другимЧто может быть здесь не так?
* ОБНОВЛЕНИЕ *
После ответа Гари все работает отлично, хотя у меня сейчас другая проблема.
У меня есть настройкаМой Poller вот так
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
Причина использования AcceptAll
, потому что тот же файл может появиться снова, поэтому я сначала как бы перемещаю файл.Но когда я включаю исполнитель потока, тот же файл обрабатывается несколькими потоками, я предполагаю, что из-за AcceptAllFile
Если я изменяю на AcceptOnceFileListFilter
, это работает, но тогда тот же самый файл, который приходит снова, не будетвзял снова!Что можно сделать, чтобы избежать этой проблемы?
Проблема / ошибка
В классе AbstractPersistentAcceptOnceFileListFilter
У нас есть код
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
Теперь дляНапример, если у меня настроен максимум на опрос 5 и есть два файла, то один и тот же файл может быть подхвачен двумя потоками.
Допустим, мой код перемещает файлы после того, как я его прочитал.
Но другой поток переходит к методу accept
, если файла нет, тогда он вернет время lastModified как 0 и вернет true.
Товызывает проблему, поскольку файл НЕ существует.
Если его значение равно 0, он должен вернуть значение false, поскольку файла больше нет.