Отказ от ответственности: я впервые использую среду Java Fork-Join, поэтому я не уверен на 100%, правильно ли я ее использую.Java также не является моим основным языком программирования, поэтому это также может иметь значение.
Учитывая следующее SSCCE :
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
Итак, я создаю ForkJoinPool
, в который я представляю некоторые задачи, которые выполняют некоторую обработку.В целях этого примера я заменил их на Thread.sleep()
, чтобы упростить его.
В моей реальной программе это очень длинный список задач, поэтому я хочу периодически печатать текущее состояние настандартный вывод.Я пытаюсь сделать это в отдельном потоке, используя запланированный TimerTask
.
Однако я заметил нечто, чего не ожидал: в моем примере вывод выглядит примерно так:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
Это означает, что «статус-задача» никогда не выполняется.
Однако, если я изменю свой код, чтобы переместить pool.invoke(calculator);
в самом конце, он будет работать, как и ожидалось:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
Единственный вывод, который я могу сделать, состоит в том, что ForkJoinPool::invoke()
блокирует основной поток (он возвращает только ПОСЛЕ того, как все задачи в пуле завершены).
Я ожидал продолжения кода в основном потокедля выполнения, в то время как задачи в fork-join-pool обрабатываются асинхронно .
Мой вопрос : это происходит из-за того, что я неправильно использовал фреймворк?Есть ли что-то, что я должен исправить в своем коде?
Я заметил, что один из конструкторов ForkJoinPool
s имеет параметр boolean asyncMode
, но, насколько я могу судить из реализации, это просто выбор между FIFO_QUEUE
и LIFO_QUEUE
режимы выполнения (точно не знаю, что это такое):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}