Почему ForkJoinPool :: invoke () блокирует основной поток? - PullRequest
0 голосов
/ 01 октября 2018

Отказ от ответственности: я впервые использую среду Java Fork-Join, поэтому я не уверен на 100%, правильно ли я ее использую.Java также не является моим основным языком программирования, поэтому это также может иметь значение.


Учитывая следующее SSCCE :

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch (InterruptedException ex) {
            System.err.println(ex.getMessage());
            System.exit(2);
        }

        System.out.println("Finished computing task with delay " + delayTask);
    }
}

public final class ForkJoinBlocker
{
    public static void main(String[] args)
    {
        ForkCalculator calculator = new ForkCalculator(
            new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
        );

        ForkJoinPool pool = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors()
        );

        pool.invoke(calculator);

        //make it a daemon thread
        Timer timer = new Timer(true);

        timer.scheduleAtFixedRate(
            new TimerTask() {
                @Override
                public void run()
                {
                    System.out.println(pool.toString());
                }
            },
            100,
            2000
        );
    }
}

Итак, я создаю ForkJoinPool, в который я представляю некоторые задачи, которые выполняют некоторую обработку.В целях этого примера я заменил их на Thread.sleep(), чтобы упростить его.

В моей реальной программе это очень длинный список задач, поэтому я хочу периодически печатать текущее состояние настандартный вывод.Я пытаюсь сделать это в отдельном потоке, используя запланированный TimerTask.

Однако я заметил нечто, чего не ожидал: в моем примере вывод выглядит примерно так:

Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......

Это означает, что «статус-задача» никогда не выполняется.

Однако, если я изменю свой код, чтобы переместить pool.invoke(calculator); в самом конце, он будет работать, как и ожидалось:

java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......

Единственный вывод, который я могу сделать, состоит в том, что ForkJoinPool::invoke() блокирует основной поток (он возвращает только ПОСЛЕ того, как все задачи в пуле завершены).

Я ожидал продолжения кода в основном потокедля выполнения, в то время как задачи в fork-join-pool обрабатываются асинхронно .

Мой вопрос : это происходит из-за того, что я неправильно использовал фреймворк?Есть ли что-то, что я должен исправить в своем коде?

Я заметил, что один из конструкторов ForkJoinPool s имеет параметр boolean asyncMode, но, насколько я могу судить из реализации, это просто выбор между FIFO_QUEUE и LIFO_QUEUE режимы выполнения (точно не знаю, что это такое):

public ForkJoinPool(
    int parallelism,
    ForkJoinWorkerThreadFactory factory,
    UncaughtExceptionHandler handler,
    boolean asyncMode
) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

1 Ответ

0 голосов
/ 01 октября 2018

В основном invoke() будет ждать завершения всей задачи, прежде чем вернуться, так что да, основной поток блокируется.После этого Timer не успевает выполнить, потому что он выполняется в потоке демона.

Вы можете просто использовать execute() вместо invoke(), который выполняет задачу асинхронно.Затем вы можете join() на ForkJoinTask дождаться результата, в течение которого будет работать Timer:

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);

    //make it a daemon thread
Timer timer = new Timer(true);

timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println(pool.toString());
        }
    }, 100, 2000);

calculator.join(); // wait for computation
...