Чем фреймворк fork / join лучше, чем пул потоков? - PullRequest
123 голосов
/ 28 октября 2011

Каковы преимущества использования новой платформы fork / join по сравнению с простым разбиением большой задачи на N подзадач в начале, отправкой их в пул кэшированных потоков (из Executors ) и ждете завершения каждого задания? Я не вижу, как использование абстракции fork / join упрощает проблему или делает решение более эффективным по сравнению с тем, что было у нас годами.

Например, алгоритм распараллеленного размытия в учебном примере может быть реализован так:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

Разделить в начале и отправить задачи в пул потоков:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

Задачи попадают в очередь пула потоков, из которой они выполняются по мере доступности рабочих потоков. Пока разделение достаточно гранулировано (чтобы избежать особого ожидания последней задачи) и в пуле потоков достаточно (по крайней мере, N процессоров) потоков, все процессоры работают на полной скорости, пока не будут выполнены все вычисления.

Я что-то упустил? Какова дополнительная ценность использования структуры fork / join?

Ответы [ 11 ]

128 голосов
/ 28 октября 2011

Я думаю, что основное недоразумение заключается в том, что примеры Fork / Join НЕ показывают работу воровство , но являются лишь неким стандартным разделением и завоеванием.

Воровство работы было бы так: работник Б закончил свою работу. Он добрый, поэтому он оглядывается и видит, что работник А все еще очень усердно работает. Он подходит и спрашивает: «Эй, парень, я мог бы помочь тебе». Ответы. «Круто, у меня есть задание в 1000 единиц. Пока я закончил 345, оставив 655. Не могли бы вы поработать с номерами с 673 по 1000, я сделаю с 346 по 672». Б говорит: «Хорошо, давайте начнем, чтобы мы могли пойти в паб раньше».

Видите ли, рабочие должны общаться друг с другом, даже когда они начали настоящую работу. Это недостающая часть в примерах.

Примеры, приведенные с другой стороны, показывают только что-то вроде «использовать субподрядчиков»:

Работник А: «Черт, у меня 1000 единиц работы. Слишком много для меня. Я сам сделаю 500 и передам 500 другим подрядчикам». Это продолжается до тех пор, пока большая задача не будет разбита на маленькие пакеты по 10 единиц в каждом. Они будут выполнены доступными работниками. Но если один пакет представляет собой отравленную таблетку и занимает значительно больше времени, чем другие пакеты - не повезло, фаза разделения закончена.

Единственное оставшееся различие между Fork / Join и предварительным разделением задачи заключается в следующем: при предварительном разделении рабочая очередь заполняется с самого начала. Пример: 1000 единиц, порог 10, поэтому в очереди 100 записей. Эти пакеты распределяются между членами пула потоков.

Fork / Join более сложен и пытается уменьшить количество пакетов в очереди:

  • Шаг 1: Поместить один пакет, содержащий (1 ... 1000) в очередь
  • Шаг 2: Один рабочий извлекает пакет (1 ... 1000) и заменяет его двумя пакетами: (1 ... 500) и (501 ... 1000).
  • Шаг 3: Один рабочий выталкивает пакет (500 ... 1000) и нажимает (500 ... 750) и (751 ... 1000).
  • Шаг n: стек содержит следующие пакеты: (1..500), (500 ... 750), (750 ... 875) ... (991..1000)
  • Шаг n + 1: пакет (991..1000) извлечен и выполнен
  • Шаг n + 2: пакет (981..990) извлечен и выполнен
  • Шаг n + 3: Пакет (961..980) всплывает и разделяется на (961 ... 970) и (971..980). ....

Вы видите: в Fork / Join очередь меньше (в примере 6), а фазы «split» и «work» чередуются.

Когда несколько рабочих одновременно появляются и толкают друг друга, взаимодействия, конечно, не так очевидны.

25 голосов
/ 28 октября 2011

Если у вас n занятых потоков, все они работают на 100% независимо, это будет лучше, чем n потоков в пуле Fork-Join (FJ).Но так никогда не получится.

Возможно, не удастся точно разбить проблему на n равных частей.Даже если вы это сделаете, планирование потоков может быть справедливым.В итоге вы ожидаете самую медленную ветку.Если у вас есть несколько задач, то каждый из них может выполняться с параллелизмом менее чем n (как правило, более эффективным), но все же переходить на n-way после завершения других задач.

Так почему бы нам просто не сократитьпроблема разбита на кусочки размером FJ и у них есть пул потоковТипичное использование FJ разбивает проблему на мелкие кусочки.Выполнение этого в случайном порядке требует большой координации на аппаратном уровне.Накладные расходы были бы убийцей.В FJ задачи помещаются в очередь, которую поток считывает в порядке «первым пришел - первым вышел» (LIFO / стек), а кража работы (обычно в основной работе) выполняется «первым пришел - первым вышел» (FIFO / «очередь»).В результате обработка длинного массива может выполняться в значительной степени последовательно, даже если она разбита на крошечные куски.(Это также тот случай, когда может быть нетривиальным разбить проблему на небольшие куски одинакового размера за один большой взрыв. Скажем, имея дело с некоторой формой иерархии без балансировки.)

Вывод: FJ позволяет большеэффективное использование аппаратных потоков в неравных ситуациях, что будет всегда, если у вас более одного потока.

14 голосов
/ 16 мая 2016

Конечная цель пулов потоков и Fork / Join схожи: оба хотят максимально использовать доступную мощность ЦП для максимальной пропускной способности.Максимальная пропускная способность означает, что как можно больше задач должно быть выполнено в течение длительного периода времени.Что для этого нужно?(В дальнейшем мы будем предполагать, что нет недостатка в вычислительных задачах: всегда достаточно сделать для 100% загрузки ЦП. Кроме того, я использую «ЦП» для ядер или виртуальных ядер эквивалентно в случае гиперпоточности).

  1. По крайней мере, должно быть столько потоков, сколько имеется доступных процессоров, потому что при меньшем количестве потоков ядро ​​не будет использоваться.
  2. Максимально должно быть столько потоков, сколько тамдоступны процессоры, потому что запуск большего количества потоков создаст дополнительную нагрузку для планировщика, который назначает процессоры различным потокам, что заставляет некоторое время процессора переходить к планировщику, а не к нашей вычислительной задаче.

Таким образом, мы решили, чтоиз-за того, что для максимальной пропускной способности нам нужно иметь одинаковое количество потоков, чем у процессоров.В примере размытия Oracle вы можете взять пул потоков фиксированного размера с количеством потоков, равным количеству доступных процессоров, или использовать пул потоков.Это не имеет значения, вы правы!

Так когда же у вас возникнут проблемы с пулами потоков?Это если поток блокирует , потому что ваш поток ожидает завершения другой задачи.Предположим следующий пример:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

Здесь мы видим алгоритм, который состоит из трех этапов A, B и C. A и B могут выполняться независимо друг от друга, но этап C требует результатаШаг A и B. Этот алгоритм выполняет передачу задачи A в пул потоков и выполнение задачи b напрямую.После этого поток будет ожидать выполнения задачи A и продолжит выполнение шага C. Если A и B завершены одновременно, то все в порядке.Но что, если A займет больше времени, чем B?Это может быть связано с тем, что природа задачи A диктует это, но это также может быть связано с тем, что вначале нет задачи для задачи A, а задача A должна ждать.(Если доступен только один процессор и, следовательно, в вашем пуле потоков есть только один поток, это даже вызовет взаимоблокировку, но на данный момент это не имеет значения).Дело в том, что поток, который только что выполнил задачу B , блокирует весь поток .Поскольку у нас столько же потоков, сколько и у процессоров, и один поток заблокирован, это означает, что один процессор простаивает .

Fork / Join решает эту проблему: в среде fork / join вы 'd написать тот же алгоритм, как показано ниже:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

Выглядит так же, не так ли?Однако подсказка в том, что aTask.join не будет блокировать .Вместо этого вот где похищение работы вступает в игру: Поток будет искать другие задачи, которые были отменены в прошлом, и продолжит их.Сначала он проверяет, начали ли обрабатываться задачи, которые он разветвлял.Поэтому, если A еще не был запущен другим потоком, он будет делать A следующим, в противном случае он проверит очередь других потоков и украдет их работу.Как только эта другая задача другого потока будет завершена, она проверит, завершена ли сейчас A.Если это вышеуказанный алгоритм, можно вызвать stepC.В противном случае он будет искать еще одну задачу, чтобы украсть.Таким образом, пулы разветвления / объединения могут достичь 100% загрузки ЦП, даже несмотря на блокирующие действия .

Однако существует ловушка: кража работы возможна только для вызова joinForkJoinTask с.Это невозможно сделать для внешних блокирующих действий, таких как ожидание другого потока или ожидание действия ввода-вывода.Так что же, ожидание завершения ввода-вывода - обычная задача?В этом случае, если бы мы могли добавить дополнительный поток в пул Fork / Join, который будет снова остановлен, как только будет выполнено блокирующее действие, это будет вторым лучшим решением.И ForkJoinPool может действительно сделать это, если мы используем ManagedBlocker с.

Фибоначчи

В JavaDoc для RecursiveTask является примером для вычисления чисел Фибоначчи с использованием Fork / Join. Для классического рекурсивного решения см .:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

Как объясняется в JavaDocs, это довольно простой способ вычисления чисел Фибоначчи, поскольку этот алгоритм имеет сложность O (2 ^ n), хотя возможны и более простые способы. Однако этот алгоритм очень прост и легок для понимания, поэтому мы придерживаемся его. Давайте предположим, что мы хотим ускорить это с помощью Fork / Join. Наивная реализация выглядела бы так:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

Шаги, на которые разбивается эта Задача, слишком коротки, и поэтому они будут работать ужасно, но вы можете видеть, как каркас в целом работает очень хорошо: два слагаемых можно вычислить независимо, но тогда нам нужно, чтобы они оба построить окончательный результат. Так что одна половина делается в другой теме. Получайте удовольствие, делая то же самое с пулами потоков без тупиковой ситуации (возможно, но не так просто).

Просто для полноты: если вы действительно хотите рассчитать числа Фибоначчи, используя этот рекурсивный подход, то вот оптимизированная версия:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Благодаря этому подзадачи значительно уменьшаются, потому что они разделяются только в том случае, если истинно n > 10 && getSurplusQueuedTaskCount() < 2, что означает, что существует значительно более 100 вызовов методов (n > 10) и не очень сложные задачи уже ожидают (getSurplusQueuedTaskCount() < 2).

На моем компьютере (4 ядра (8 при подсчете Hyper-Threading), Intel (R) Core (TM) i7-2720QM CPU @ 2,20 ГГц) fib(50) занимает 64 секунды с классическим подходом и всего 18 секунд с подход Fork / Join, который является довольно заметным преимуществом, хотя и не настолько теоретически возможным.

Резюме

  • Да, в вашем примере Fork / Join не имеет преимущества перед классическими пулами потоков.
  • Fork / Join может значительно улучшить производительность при блокировке
  • Fork / Join позволяет обойти некоторые тупиковые проблемы
13 голосов
/ 28 октября 2011

Форк / соединение отличается от пула потоков, потому что он реализует кражу работы. От Форк / Регистрация

Как и в любом ExecutorService, инфраструктура fork / join распределяет задачи рабочим потокам в пуле потоков. Фреймворк fork / join отличается, потому что он использует алгоритм кражи работы. Рабочие темы что не хватает вещей, которые можно сделать, может украсть задачи из других потоков, которые все еще заняты.

Скажем, у вас есть два потока и 4 задания a, b, c, d, которые занимают 1, 1, 5 и 6 секунд соответственно. Первоначально a и b назначаются потоку 1, а c и d - потоку 2. В пуле потоков это займет 11 секунд. С помощью fork / join поток 1 завершает работу и может украсть работу из потока 2, поэтому задача d в ​​конечном итоге будет выполняться потоком 1. Поток 1 выполняет a, b и d, поток 2 просто c. Общее время: 8 секунд, а не 11.

РЕДАКТИРОВАТЬ: Как отмечает Joonas, задачи не обязательно предварительно выделены для потока. Идея fork / join заключается в том, что поток может выбрать разделение задачи на несколько частей. Итак, чтобы повторить выше:

У нас есть две задачи (ab) и (cd), которые занимают 2 и 11 секунд соответственно. Поток 1 начинает выполнять ab и разделяет его на две подзадачи a & b. Аналогично с потоком 2 он разделяется на две подзадачи c & d. Когда поток 1 закончил a & b, он может украсть d из потока 2.

12 голосов
/ 22 июня 2012

Все, кто выше, правы, преимущества достигаются за счет кражи работы, но если остановиться на том, почему это так.

Основным преимуществом является эффективная координация между рабочими потоками.Работу нужно разделить и собрать, что требует координации.Как вы можете видеть в ответе AH выше, у каждого потока есть свой рабочий список.Важным свойством этого списка является его сортировка (большие задачи вверху и небольшие задачи внизу).Каждый поток выполняет задачи в нижней части своего списка и ворует задачи из верхней части списков других потоков.

В результате получается:

  • Голова и хвост задачисписки могут синхронизироваться независимо, уменьшая количество конфликтов в списке.
  • Значимые поддеревья работы разделяются и повторно собираются одним и тем же потоком, поэтому для этих поддеревьев не требуется координация между потоками.Когда поток крадет работу, он берет большой кусок, который затем подразделяет на свой собственный список
  • Сталирование работы означает, что потоки почти полностью используются до конца процесса.Большинство других схем «разделяй и властвуй», использующих пулы потоков, требуют большего взаимодействия между потоками и координации.
11 голосов
/ 03 декабря 2013

В этом примере Fork / Join не добавляет значения, поскольку разветвление не требуется и рабочая нагрузка равномерно распределяется по рабочим потокам.Fork / Join только добавляет накладные расходы.

Вот хорошая статья на эту тему.Цитата:

В целом можно сказать, что ThreadPoolExecutor предпочтительнее, когда рабочая нагрузка равномерно распределяется по рабочим потокам.Чтобы гарантировать это, вам нужно точно знать, как выглядят входные данные.Напротив, ForkJoinPool обеспечивает хорошую производительность независимо от входных данных и, таким образом, является значительно более надежным решением.

8 голосов
/ 05 сентября 2012

Еще одним важным отличием является то, что с помощью F-J вы можете выполнять несколько сложных фаз «Соединение». Рассмотрим сортировку слиянием от http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html,, поэтому для предварительного разделения этой работы потребуется слишком много оркестровки. например Вам нужно сделать следующие вещи:

  • сортировка за первый квартал
  • сортировка во втором квартале
  • объединить первые 2 квартала
  • сортировка в третьем квартале
  • Сортировка четвертого квартала
  • объединить последние 2 квартала
  • объединить 2 половинки

Как указать, что вы должны выполнять сортировку до слияния, которое их касается и т. Д.

Я искал, как лучше сделать определенную вещь для каждого из списка предметов. Я думаю, что я просто предварительно разделю список и буду использовать стандартный ThreadPool. FJ кажется наиболее полезным, когда работа не может быть предварительно разделена на достаточно независимые задачи, но может быть рекурсивно разделена на задачи, которые не зависят друг от друга (например, сортировка половин независима, но объединение 2 отсортированных половин в отсортированное целое не). 1023 *

6 голосов
/ 03 декабря 2012

F / J также имеет явное преимущество, когда у вас есть дорогие операции слияния.Поскольку он разбивается на древовидную структуру, вы выполняете только слияние log2 (n), а не n с линейным разделением потоков.(Это делает теоретическое предположение, что у вас столько же процессоров, сколько потоков, но все же преимущество). Для домашнего задания нам пришлось объединить несколько тысяч двумерных массивов (все одинаковые измерения) путем суммирования значений в каждом индексе.С процессорами fork join и P время приближается к log2 (n), а P приближается к бесконечности.

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9

2 голосов
/ 21 апреля 2016

Если проблема такова, что нам нужно дождаться завершения других потоков (как в случае сортировки массива или суммы массива), следует использовать fork join, поскольку Executor (Executors.newFixedThreadPool (2)) захлебнетсяиз-за ограниченного количества потоков.В этом случае пул forkjoin создаст больше потоков для сокрытия для заблокированного потока, чтобы поддерживать тот же параллелизм

Источник: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Проблема с исполнителями для реализацииАлгоритмы «разделяй и властвуй» не связаны с созданием подзадач, потому что вызываемый объект может свободно отправлять новую подзадачу своему исполнителю и ожидать ее результата синхронно или асинхронно.Проблема заключается в параллелизме: когда Callable ожидает результата другого Callable, он переводится в состояние ожидания, тратя впустую возможность обработки другого Callable, поставленного в очередь на выполнение.

Добавлена ​​структура fork / joinк пакету java.util.concurrent в Java SE 7, благодаря усилиям Дуга Ли, этот пробел заполняется

Источник: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

Пул пытается поддерживать достаточно активный (или доступные) потоки путем динамического добавления, приостановки или возобновления внутренних рабочих потоков, даже если некоторые задачи останавливаются в ожидании присоединения к другим.Однако такие настройки не гарантируются в условиях заблокированного ввода-вывода или другой неуправляемой синхронизации

public int getPoolSize () Возвращает количество рабочих потоков, которые были запущены, но еще не завершены. Результат, возвращаемый этим методом, может отличаться от getParallelism (), когда потоки создаются для поддержания параллелизма, когда другие блокируются совместно.

2 голосов
/ 19 октября 2015

Вы будете поражены производительностью ForkJoin в таких приложениях, как crawler. Вот лучший учебник , из которого вы могли бы поучиться.

Логика Fork / Join очень проста: (1) отдельная (форк) каждая большая задача в меньшие задачи; (2) обрабатывать каждую задачу в отдельном потоке (при необходимости разделяя их на еще более мелкие задачи); (3) присоединиться к Результаты.

...