Постоянно отправляя Runnable задачи в ExecutorService, пока работа не будет завершена, получая java .util.concurrent.RejectedExecutionException - PullRequest
1 голос
/ 17 февраля 2020

Мой многопоточный класс должен выполнять три операции - operation1, operation2 и operation3 - для ряда объектов класса ClassA, где каждый тип операции зависит от ранняя операция. Для этого я попытался реализовать шаблон «производитель-потребитель», используя число BlockingQueue s и ExecutorService.

final ExecutorService executor = ForkJoinPool.commonPool();
final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);

Операции реализованы так:

void doOperationOne() throws InterruptedException {
    ClassA objectA = operationOneQueue.take();
    objectA.operationOne();
    operationTwoQueue.put(objectA);
}

где каждый тип операции имеет свой собственный соответствующий метод со своими «собственными» входящими и выходящими в очередь. Каждый метод операции вызывает соответствующий метод для объекта ClassA. Метод doOperationThree помещает ClassA объекты в resultQueue, что означает, что они полностью обработаны.

Сначала я заполняю operationOneQueue всеми ClassA объектами, с которыми нужно работать. Затем я пытаюсь назначить исполняемые задачи для ExecutorService следующим образом:

    while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) {
        executor.execute(() -> {
            try {
                doOperationOne();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.execute(() -> {
            try {
                doOperationTwo();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.execute(() -> {
            try {
                doOperationThree();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    executor.shutdown();

Запустив мою программу, я получаю java.util.concurrent.RejectedExecutionException.

    Operation1: ClassA object 0
    Operation2: ClassA object 0
    Operation1: ClassA object 1
    Operation3: ClassA object 0
    ....
    Operation1: ClassA object 46
    Operation2: ClassA object 45
    Operation3: ClassA object 45
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867)
at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911)
at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
at concurrent.operations.Program1.main(Program1.java:96)

Что я делаю не так? Как я могу добиться этого, не перенасыщая пул потоков?

Редактировать: Полное раскрытие - это домашнее задание с некоторыми требованиями. 1. Я должен использовать ForkJoinPool.commonPool() и не должен сам устанавливать количество потоков, 2. Я должен использовать шаблон потребитель-производитель и 3. Я не должен изменять ClassA.

1 Ответ

0 голосов
/ 20 февраля 2020

Мне действительно нравится делать параллельные вещи, поэтому я попробовал написать это. Я использовал CompletableFuture, который а) действительно запускается в ForkJoinPool.commonPool по умолчанию и б) действительно упрощает фактическую обработку:

while (true) {
    final ClassA nextOperation = queue.take();
    CompletableFuture.runAsync(nextOperation::operationOne)
        .thenRun(nextOperation::operationTwo)
        .thenRun(nextOperation::operationThree)
        .thenRun(() -> resultQueue.add(nextOperation));
}

Это будет принимать ClassA объектов из очереди и выполнять все их операции одновременно, но по порядку.

Вы не указали, откуда берутся задачи и нужно ли потребителю прекратить работу. Обычно вы этого не хотите, и это немного усложняет дело.

private static final int COUNT = 10;
private static final Random RANDOM = new Random();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    BlockingQueue<ClassA> runnables = new ArrayBlockingQueue<>(COUNT);
    BlockingQueue<ClassA> finished = new ArrayBlockingQueue<>(COUNT);

    // start producer
    ExecutorService createTaskExecutor = Executors.newSingleThreadExecutor();
    createTaskExecutor.submit(() -> fillQueue(runnables));
    // wait for all consumer tasks to finish
    while (finished.size() != COUNT) {
        try {
            // we need to poll instead of waiting forever
            // because the last tasks might still be running
            // while there are no others to add anymore
            // so we need to check again if all have finished in the meantime
            final ClassA nextOperation = runnables.poll(2, TimeUnit.SECONDS);
            if (nextOperation != null) {
                CompletableFuture.runAsync(nextOperation::operationOne)
                        .thenRun(nextOperation::operationTwo)
                        .thenRun(nextOperation::operationThree)
                        .thenRun(() -> finished.add(nextOperation));
            }
        } catch (InterruptedException e) {
            System.err.println("exception while retrieving next operation");
            // we will actually need to terminate now, or probably never will
            throw e;
        }
    }
    System.out.printf("finished tasks (%d):%n", finished.size());
    for (ClassA classA : finished) {
        System.out.printf("finished task %d%n", classA.designator);
    }
    createTaskExecutor.shutdown();
}

private static void fillQueue(BlockingQueue<ClassA> runnables) {
    // start thread filling the queue at random
    for (int i = 0; i < COUNT; i++) {
        runnables.add(new ClassA(i));
        try {
            Thread.sleep(RANDOM.nextInt(1_000));
        } catch (InterruptedException e) {
            System.err.println("failed to add runnable");
        }
    }
}

Поскольку вы не предоставили ClassA, я использовал этот. Он содержит идентификатор, чтобы вы могли отслеживать, в какое время он работает.

class ClassA {
    private static final Random RANDOM = new Random();
    public final int designator;

    public ClassA(int i) {
        designator = i;
    }

    public void operationOne() {
        System.out.printf("%d: operation 1%n", designator);
        sleep();
    }

    public void operationTwo() {
        System.out.printf("%d: operation 2%n", designator);
        sleep();
    }

    public void operationThree() {
        System.out.printf("%d: operation 3%n", designator);
        sleep();
    }

    private static void sleep() {
        try {
            Thread.sleep(RANDOM.nextInt(5_000));
        } catch (InterruptedException e) {
            System.err.println("interrupted while executing task");
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...