Я хочу реализовать следующее поведение:
- Чтение n событий из файла
- Обработка их в потоках
- Go вернуться к шагу 1, если есть события остаются
Я написал следующее приложение для тестирования решения, но оно не работает в случайный момент, например.
java.lang.IllegalStateException: Failed to execute CommandLineRunner
Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@62b3df3a[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 70]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@71ea1fda
Какую емкость очереди я должен установить, если я не хочу ставить события в очереди? Я хочу немедленно обработать их.
Я использую Open JDK 11 и Spring Boot 2.2.2.RELEASE
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
@Autowired
private ThreadPoolTaskExecutor eventExecutor;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean(name = "eventExecutor")
public ThreadPoolTaskExecutor eventExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(5);
pool.setQueueCapacity(0);
pool.setAwaitTerminationSeconds(0);
pool.initialize();
return pool;
}
@Override
public void run(String... args) {
System.out.println("Start events processing");
long start = System.currentTimeMillis();
int result = 0;
for (int i = 0; i < 100; i++) {
List<CompletableFuture<Integer>> completableFutures = getEvents(5).stream()
.map(event -> CompletableFuture.supplyAsync(() -> processEvent(event), eventExecutor))
.collect(Collectors.toList());
result += completableFutures.stream()
.mapToInt(CompletableFuture::join)
.sum();
}
long timeMillis = System.currentTimeMillis() - start;
System.out.println("Took " + timeMillis + "ms, " + result);
}
private List<Event> getEvents(int n) {
List<Event> events = new ArrayList<>();
for (int i = 1; i <= n; i++) {
events.add(new Event(i));
}
return events;
}
private int processEvent(Event event) {
System.out.println("processing event " + event.id);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("processing event " + event.id + " finished");
return 1;
}
private static class Event {
private int id;
private Event(int id) {
this.id = id;
}
}
}