Конечная цель пулов потоков и Fork / Join схожи: оба хотят максимально использовать доступную мощность ЦП для максимальной пропускной способности.Максимальная пропускная способность означает, что как можно больше задач должно быть выполнено в течение длительного периода времени.Что для этого нужно?(В дальнейшем мы будем предполагать, что нет недостатка в вычислительных задачах: всегда достаточно сделать для 100% загрузки ЦП. Кроме того, я использую «ЦП» для ядер или виртуальных ядер эквивалентно в случае гиперпоточности).
- По крайней мере, должно быть столько потоков, сколько имеется доступных процессоров, потому что при меньшем количестве потоков ядро не будет использоваться.
- Максимально должно быть столько потоков, сколько тамдоступны процессоры, потому что запуск большего количества потоков создаст дополнительную нагрузку для планировщика, который назначает процессоры различным потокам, что заставляет некоторое время процессора переходить к планировщику, а не к нашей вычислительной задаче.
Таким образом, мы решили, чтоиз-за того, что для максимальной пропускной способности нам нужно иметь одинаковое количество потоков, чем у процессоров.В примере размытия Oracle вы можете взять пул потоков фиксированного размера с количеством потоков, равным количеству доступных процессоров, или использовать пул потоков.Это не имеет значения, вы правы!
Так когда же у вас возникнут проблемы с пулами потоков?Это если поток блокирует , потому что ваш поток ожидает завершения другой задачи.Предположим следующий пример:
class AbcAlgorithm implements Runnable {
public void run() {
Future<StepAResult> aFuture = threadPool.submit(new ATask());
StepBResult bResult = stepB();
StepAResult aResult = aFuture.get();
stepC(aResult, bResult);
}
}
Здесь мы видим алгоритм, который состоит из трех этапов A, B и C. A и B могут выполняться независимо друг от друга, но этап C требует результатаШаг A и B. Этот алгоритм выполняет передачу задачи A в пул потоков и выполнение задачи b напрямую.После этого поток будет ожидать выполнения задачи A и продолжит выполнение шага C. Если A и B завершены одновременно, то все в порядке.Но что, если A займет больше времени, чем B?Это может быть связано с тем, что природа задачи A диктует это, но это также может быть связано с тем, что вначале нет задачи для задачи A, а задача A должна ждать.(Если доступен только один процессор и, следовательно, в вашем пуле потоков есть только один поток, это даже вызовет взаимоблокировку, но на данный момент это не имеет значения).Дело в том, что поток, который только что выполнил задачу B , блокирует весь поток .Поскольку у нас столько же потоков, сколько и у процессоров, и один поток заблокирован, это означает, что один процессор простаивает .
Fork / Join решает эту проблему: в среде fork / join вы 'd написать тот же алгоритм, как показано ниже:
class AbcAlgorithm implements Runnable {
public void run() {
ATask aTask = new ATask());
aTask.fork();
StepBResult bResult = stepB();
StepAResult aResult = aTask.join();
stepC(aResult, bResult);
}
}
Выглядит так же, не так ли?Однако подсказка в том, что aTask.join
не будет блокировать .Вместо этого вот где похищение работы вступает в игру: Поток будет искать другие задачи, которые были отменены в прошлом, и продолжит их.Сначала он проверяет, начали ли обрабатываться задачи, которые он разветвлял.Поэтому, если A еще не был запущен другим потоком, он будет делать A следующим, в противном случае он проверит очередь других потоков и украдет их работу.Как только эта другая задача другого потока будет завершена, она проверит, завершена ли сейчас A.Если это вышеуказанный алгоритм, можно вызвать stepC
.В противном случае он будет искать еще одну задачу, чтобы украсть.Таким образом, пулы разветвления / объединения могут достичь 100% загрузки ЦП, даже несмотря на блокирующие действия .
Однако существует ловушка: кража работы возможна только для вызова join
ForkJoinTask
с.Это невозможно сделать для внешних блокирующих действий, таких как ожидание другого потока или ожидание действия ввода-вывода.Так что же, ожидание завершения ввода-вывода - обычная задача?В этом случае, если бы мы могли добавить дополнительный поток в пул Fork / Join, который будет снова остановлен, как только будет выполнено блокирующее действие, это будет вторым лучшим решением.И ForkJoinPool
может действительно сделать это, если мы используем ManagedBlocker
с.
Фибоначчи
В JavaDoc для RecursiveTask является примером для вычисления чисел Фибоначчи с использованием Fork / Join. Для классического рекурсивного решения см .:
public static int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}
Как объясняется в JavaDocs, это довольно простой способ вычисления чисел Фибоначчи, поскольку этот алгоритм имеет сложность O (2 ^ n), хотя возможны и более простые способы. Однако этот алгоритм очень прост и легок для понимания, поэтому мы придерживаемся его. Давайте предположим, что мы хотим ускорить это с помощью Fork / Join. Наивная реализация выглядела бы так:
class Fibonacci extends RecursiveTask<Long> {
private final long n;
Fibonacci(long n) {
this.n = n;
}
public Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
Шаги, на которые разбивается эта Задача, слишком коротки, и поэтому они будут работать ужасно, но вы можете видеть, как каркас в целом работает очень хорошо: два слагаемых можно вычислить независимо, но тогда нам нужно, чтобы они оба построить окончательный результат. Так что одна половина делается в другой теме. Получайте удовольствие, делая то же самое с пулами потоков без тупиковой ситуации (возможно, но не так просто).
Просто для полноты: если вы действительно хотите рассчитать числа Фибоначчи, используя этот рекурсивный подход, то вот оптимизированная версия:
class FibonacciBigSubtasks extends RecursiveTask<Long> {
private final long n;
FibonacciBigSubtasks(long n) {
this.n = n;
}
public Long compute() {
return fib(n);
}
private long fib(long n) {
if (n <= 1) {
return 1;
}
if (n > 10 && getSurplusQueuedTaskCount() < 2) {
final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
f1.fork();
return f2.compute() + f1.join();
} else {
return fib(n - 1) + fib(n - 2);
}
}
}
Благодаря этому подзадачи значительно уменьшаются, потому что они разделяются только в том случае, если истинно n > 10 && getSurplusQueuedTaskCount() < 2
, что означает, что существует значительно более 100 вызовов методов (n > 10
) и не очень сложные задачи уже ожидают (getSurplusQueuedTaskCount() < 2
).
На моем компьютере (4 ядра (8 при подсчете Hyper-Threading), Intel (R) Core (TM) i7-2720QM CPU @ 2,20 ГГц) fib(50)
занимает 64 секунды с классическим подходом и всего 18 секунд с подход Fork / Join, который является довольно заметным преимуществом, хотя и не настолько теоретически возможным.
Резюме
- Да, в вашем примере Fork / Join не имеет преимущества перед классическими пулами потоков.
- Fork / Join может значительно улучшить производительность при блокировке
- Fork / Join позволяет обойти некоторые тупиковые проблемы