Мой многопоточный класс должен выполнять три операции - operation1
, operation2
и operation3
- для ряда объектов класса ClassA
, где каждый тип операции зависит от ранняя операция. Для этого я попытался реализовать шаблон «производитель-потребитель», используя число BlockingQueue
s и ExecutorService
.
final ExecutorService executor = ForkJoinPool.commonPool();
final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
Операции реализованы так:
void doOperationOne() throws InterruptedException {
ClassA objectA = operationOneQueue.take();
objectA.operationOne();
operationTwoQueue.put(objectA);
}
где каждый тип операции имеет свой собственный соответствующий метод со своими «собственными» входящими и выходящими в очередь. Каждый метод операции вызывает соответствующий метод для объекта ClassA
. Метод doOperationThree
помещает ClassA
объекты в resultQueue
, что означает, что они полностью обработаны.
Сначала я заполняю operationOneQueue
всеми ClassA
объектами, с которыми нужно работать. Затем я пытаюсь назначить исполняемые задачи для ExecutorService
следующим образом:
while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) {
executor.execute(() -> {
try {
doOperationOne();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationTwo();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationThree();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
Запустив мою программу, я получаю java.util.concurrent.RejectedExecutionException
.
Operation1: ClassA object 0
Operation2: ClassA object 0
Operation1: ClassA object 1
Operation3: ClassA object 0
....
Operation1: ClassA object 46
Operation2: ClassA object 45
Operation3: ClassA object 45
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867)
at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911)
at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
at concurrent.operations.Program1.main(Program1.java:96)
Что я делаю не так? Как я могу добиться этого, не перенасыщая пул потоков?
Редактировать: Полное раскрытие - это домашнее задание с некоторыми требованиями. 1. Я должен использовать ForkJoinPool.commonPool()
и не должен сам устанавливать количество потоков, 2. Я должен использовать шаблон потребитель-производитель и 3. Я не должен изменять ClassA
.