ThreadPoolTaskExecutor создает исключение RejectedExecutionException - PullRequest
0 голосов
/ 13 января 2020

Я хочу реализовать следующее поведение:

  1. Чтение n событий из файла
  2. Обработка их в потоках
  3. Go вернуться к шагу 1, если есть события остаются

Я написал следующее приложение для тестирования решения, но оно не работает в случайный момент, например.

java.lang.IllegalStateException: Failed to execute CommandLineRunner
Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@62b3df3a[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 70]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@71ea1fda

Какую емкость очереди я должен установить, если я не хочу ставить события в очереди? Я хочу немедленно обработать их.

Я использую Open JDK 11 и Spring Boot 2.2.2.RELEASE

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    @Autowired
    private ThreadPoolTaskExecutor eventExecutor;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean(name = "eventExecutor")
    public ThreadPoolTaskExecutor eventExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(5);
        pool.setMaxPoolSize(5);
        pool.setQueueCapacity(0);
        pool.setAwaitTerminationSeconds(0);
        pool.initialize();
        return pool;
    }

    @Override
    public void run(String... args) {
        System.out.println("Start events processing");
        long start = System.currentTimeMillis();

        int result = 0;
        for (int i = 0; i < 100; i++) {
            List<CompletableFuture<Integer>> completableFutures = getEvents(5).stream()
                    .map(event -> CompletableFuture.supplyAsync(() -> processEvent(event), eventExecutor))
                    .collect(Collectors.toList());

            result += completableFutures.stream()
                    .mapToInt(CompletableFuture::join)
                    .sum();
        }

        long timeMillis = System.currentTimeMillis() - start;

        System.out.println("Took " + timeMillis + "ms, " + result);
    }

    private List<Event> getEvents(int n) {
        List<Event> events = new ArrayList<>();
        for (int i = 1; i <= n; i++) {
            events.add(new Event(i));
        }
        return events;
    }

    private int processEvent(Event event) {
        System.out.println("processing event " + event.id);

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("processing event " + event.id + " finished");

        return 1;
    }

    private static class Event {

        private int id;

        private Event(int id) {
            this.id = id;
        }
    }
}

1 Ответ

0 голосов
/ 13 января 2020

Я предлагаю изменить pool.setQueueCapacity(0), чтобы использовать положительное значение, чтобы позволить задачам ставиться в очередь для обработки, когда нет потоков, доступных в так настроенном пуле фиксированного размера (corePoolSize == maxPoolSize == 5 ).

Вывод "размер пула = 5, активных потоков = 4" показывает приблизительное число активных потоков .

Теоретически это может случается, что потоки не возвращаются обратно в пул до попытки обработки нового пакета событий.

...