TaskExecutor не работает Spring Integration - PullRequest
0 голосов
/ 28 ноября 2018

У меня есть файл настройки с программой-исполнителем

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

Как видно, у меня есть фиксированная установка threadpool из 10 и максимальное сообщение 10 на опрос.Если я поставлю 10 файлов, он все еще обрабатывает один за другимЧто может быть здесь не так?

* ОБНОВЛЕНИЕ *

После ответа Гари все работает отлично, хотя у меня сейчас другая проблема.

У меня есть настройкаМой Poller вот так

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

Причина использования AcceptAll, потому что тот же файл может появиться снова, поэтому я сначала как бы перемещаю файл.Но когда я включаю исполнитель потока, тот же файл обрабатывается несколькими потоками, я предполагаю, что из-за AcceptAllFile

Если я изменяю на AcceptOnceFileListFilter, это работает, но тогда тот же самый файл, который приходит снова, не будетвзял снова!Что можно сделать, чтобы избежать этой проблемы?

Проблема / ошибка

В классе AbstractPersistentAcceptOnceFileListFilter У нас есть код

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

Теперь дляНапример, если у меня настроен максимум на опрос 5 и есть два файла, то один и тот же файл может быть подхвачен двумя потоками.

Допустим, мой код перемещает файлы после того, как я его прочитал.

Но другой поток переходит к методу accept

, если файла нет, тогда он вернет время lastModified как 0 и вернет true.

Товызывает проблему, поскольку файл НЕ существует.

Если его значение равно 0, он должен вернуть значение false, поскольку файла больше нет.

1 Ответ

0 голосов
/ 28 ноября 2018

При добавлении исполнителя задач в опросчик;все, что делает, - это поток планировщика передает задачу опроса потоку в пуле потоков;maxMessagesPerPoll является частью задачи опроса.Сам опросер запускается только раз в 5 секунд.Чтобы получить то, что вы хотите, вы должны добавить в поток канал исполнителя ...

@SpringBootApplication
public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<String>handle((p, h) -> {
                    try {
                        logger.info(p);
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
}

РЕДАКТИРОВАТЬ

Это прекрасно работает для меня ...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .handle((p, h) -> {
                try {
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

и

2018-11-28 11: 46: 05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application: / tmp /foo / test1.txt

2018-11-28 11: 46: 05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Применение: /tmp/foo/test2.txt

и с touch test1.txt

2018-11-28 11: 48: 00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Применение: /tmp/foo/test1.txt

EDIT1

Согласовано - воспроизведено с этим ...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .<File>handle((p, h) -> {
                try {
                    p.delete();
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

и

2018-11-28 13: 22: 23,689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Применение: /tmp/foo/test1.txt

2018-11-28 13: 22: 23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Применение: /tmp/foo/test2.txt

2018-11-28 13: 22: 23,690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Применение: /tmp/foo/test1.txt

2018-11-28 13: 22: 23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Применение: /tmp/foo/test2.txt

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...