Какая самая быстрая циклическая синхронизация в Java (ExecutorService против CyclicBarrier против X)? - PullRequest
15 голосов
/ 26 апреля 2010

Какая конструкция синхронизации Java, вероятно, обеспечит наилучшее производительность для параллельного итеративного сценария обработки с фиксированное количество потоков, как указано ниже? После экспериментов на некоторое время самостоятельно (используя ExecutorService и CyclicBarrier) и Буду несколько удивлен результатами, я был бы благодарен за некоторые совет специалиста и, возможно, некоторые новые идеи. Существующие вопросы здесь делают не похоже, чтобы сосредоточиться в первую очередь на производительности, следовательно, этот новый. Заранее спасибо!

Ядром приложения является простой итеративный алгоритм обработки данных, распараллелено, чтобы распределить вычислительную нагрузку по 8 ядрам на Mac Pro, работающий под управлением OS X 10.6 и Java 1.6.0_07. Данные для обработки разбивается на 8 блоков, и каждый блок подается в Runnable для выполнения одним из фиксированного числа потоков. Распараллеливание алгоритма было довольно просто, и это функционально работает как хотелось бы, но его производительность еще не такая, как я думаю. Приложение кажется тратить много времени на синхронизацию системных вызовов, поэтому через некоторое время Интересно, выбрал ли я наиболее подходящий механизм (ы) синхронизации.

Ключевым требованием алгоритма является то, что ему необходимо продолжить этапы, поэтому потоки должны синхронизироваться в конце каждого этапа. Основной поток готовит работу (очень низкие накладные расходы), передает ее потоки, позволяет им работать над этим, затем продолжается, когда все потоки сделано, переставляет работу (опять же очень низкие накладные расходы) и повторяется цикл. Машина, посвященная этой задаче, Сборка мусора минимизируется с помощью пулов для каждого потока предварительно выделенных элементов, и количество потоков может быть фиксированным (без входящих запросов и т. п., только один поток на ядро ​​процессора).

V1 - ExecutorService

Моя первая реализация использовала ExecutorService с 8 работниками потоки. Программа создает 8 задач, удерживая работу, а затем позволяет им работать над этим, примерно так:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

Это хорошо работает функционально (оно делает то, что должно), и для очень большие рабочие элементы действительно все 8 процессоров становятся очень загруженными, как столько, сколько алгоритм обработки должен был бы позволить (некоторые рабочие элементы будут заканчиваться быстрее других, затем простаивать). Тем не мение, как рабочие элементы становятся меньше (и это на самом деле не под управление программой), пользовательская загрузка ЦП резко уменьшается:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

Условные обозначения: - размер блока = размер рабочего элемента (= вычислительные шаги) - система = загрузка системы, как показано в OS X Activity Monitor (красная полоса) - пользователь = пользовательская загрузка, как показано в OS X Activity Monitor (зеленая полоса) - циклов / сек = итерации основного цикла while, чем больше, тем лучше

Основной проблемой здесь является высокий процент затраченного времени в системе, которая, кажется, управляется синхронизацией потоков звонки. Как и ожидалось, для небольших рабочих элементов, ExecutorService.invokeAll () потребует относительно больше усилий для синхронизации потоков по сравнению с объемом работы, выполняемой в каждом потоке. Но поскольку ExecutorService является более общим, чем это должно быть для этого варианта использования (он может ставить задачи для потоков, если есть больше задач, чем ядер), хотя я, может быть, был бы худее конструкция синхронизации.

V2 - CyclicBarrier

Следующая реализация использовала CyclicBarrier для синхронизации темы до получения работы и после ее завершения, примерно так:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Опять же, это хорошо работает функционально (оно делает то, что должно), и для очень больших рабочих элементов действительно все 8 процессоров становятся очень загружается, как и раньше. Однако, когда рабочие элементы становятся меньше, нагрузка по-прежнему резко уменьшается:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

Для больших рабочих элементов синхронизация незначительна и производительность идентична V1. Но неожиданно результаты (узкоспециализированный) CyclicBarrier, кажется, гораздо больше, чем те для (общего) ExecutorService: пропускная способность (циклов / сек)только около 1/4 от V1. Предварительный вывод будет что, хотя это кажется рекламируемым идеальным использованием случай для CyclicBarrier, он работает намного хуже, чем универсальный ExecutorService.

V3 - Ожидание / Уведомление + CyclicBarrier

Казалось, стоит попытаться заменить первый циклический барьер, ожидающий () с простым механизмом ожидания / уведомления:

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Опять же, это хорошо работает функционально (оно делает то, что должно).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

Пропускная способность для небольших рабочих элементов все еще намного хуже из ExecutorService, но примерно в 2 раза больше, чем у CyclicBarrier. Устранение одного CyclicBarrier устраняет половину разрыва.

V4 - Ожидание занято вместо ожидания / уведомления

Так как это приложение является основным, работающим в системе и в любом случае ядра простаивают, если они не заняты рабочим элементом, почему бы не попробовать занятое ожидание рабочих элементов в каждом потоке, даже если это раскручивает процессор без необходимости. Изменен код рабочего потока следующим образом:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Также хорошо работает функционально (делает то, что должен).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

Для небольших рабочих элементов это увеличивает пропускную способность еще 10% по сравнению с CyclicBarrier + вариант ожидания / уведомления, который не является незначительный. Но это все еще намного ниже пропускной способности, чем V1 с ExecutorService.

V5 -?

Итак, каков наилучший механизм синхронизации для такого (предположительно не редкость) проблема? Я устал писать свои собственный механизм синхронизации для полной замены ExecutorService (при условии, что это слишком общий характер и должно быть что-то это все еще можно вынуть, чтобы сделать его более эффективным). Это не моя область знаний, и я обеспокоен тем, что потратить много времени на его отладку (так как я даже не уверен мои варианты ожидания / уведомления и занятого ожидания верны) для неопределенное усиление.

Любой совет будет принят с благодарностью.

Ответы [ 6 ]

6 голосов
/ 05 октября 2012

Кажется, что вам не нужна синхронизация между рабочими. Возможно, вам следует рассмотреть возможность использования инфраструктуры ForkJoin, которая доступна в Java 7, а также отдельной библиотеки. Некоторые ссылки:

3 голосов
/ 27 апреля 2010

Обновление: V6 - Ожидание занятости, с основным потоком также работает

Очевидное улучшение V5 (ожидание занятости в 7 рабочих потоках, ожидание завершения в основном потоке), похоже, снова разделило работу на 7 + 1 частей и позволило главному потоку обрабатывать одну часть одновременно с другой рабочие потоки (вместо просто занятого ожидания), и впоследствии занятого ожидания для завершения всех рабочих элементов других потоков. Это будет использовать 8-й процессор (в конфигурации с 8 ядрами примера) и добавить его циклы в доступный пул вычислительных ресурсов.

Это было действительно просто реализовать. И результаты действительно снова немного лучше:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.39
64k         1.0%     98%       6.8
16k         1.0%     98%      50.4
4096        1.0%     98%     372
1024        1.0%     98%    1317
256         1.0%     98%    3546
64          1.5%     98%    9091
16          2.0%     98%   16949

Так что, похоже, это лучшее решение на сегодняшний день.

1 голос
/ 05 февраля 2011

Просто нажмите на эту ветку, и хотя ей почти год, позвольте мне указать вам на библиотеку "jbarrier", которую мы разработали в Боннском университете несколько месяцев назад:

http://net.cs.uni -bonn.de / WG / CS / приложений / jbarrier /

Барьерный пакет нацелен именно на тот случай, когда количество рабочих потоков <= количество ядер. Пакет основан на ожидании занятости, он поддерживает не только барьерные действия, но и глобальные сокращения, и, кроме центрального барьера, он предлагает древовидные барьеры для параллелизации частей синхронизации / сокращения еще больше.

1 голос
/ 28 апреля 2010

Обновление: V7 - Ожидание занято, которое переходит в Ожидание / Уведомление

После некоторой игры с V6 оказывается, что занятые ожидания немного затеняют реальные горячие точки приложения при профилировании. Кроме того, вентилятор в системе продолжает перегружаться, даже если никакие рабочие элементы не обрабатываются. Таким образом, дальнейшее улучшение состояло в том, чтобы заняться ожиданием рабочих элементов в течение фиксированного периода времени (скажем, около 2 миллисекунд), а затем вернуться к «более приятной» комбинации wait () / notify (). Рабочие потоки просто публикуют свой текущий режим ожидания в основном потоке с помощью атомарного логического значения, которое указывает, заняты ли они ожиданием (и, следовательно, просто нужно установить рабочий элемент) или ожидают ли они вызова notify (), потому что они ждать ().

Еще одно улучшение, которое оказалось довольно простым, заключалось в том, чтобы позволить потокам, которые завершили свой основной рабочий элемент, повторно вызывать предоставленный клиентом обратный вызов, пока они ожидают, пока другие потоки завершат свои основные рабочие элементы. Таким образом, время ожидания (которое происходит из-за того, что потоки связаны с несколько иными рабочими нагрузками) не должно быть полностью потеряно приложением.

Мне все еще очень интересно узнать мнение других пользователей, которые сталкивались с подобным сценарием использования.

1 голос

Мне также интересно, не могли бы вы попробовать более 8 потоков. Если ваш процессор поддерживает HyperThreading, то (по крайней мере, теоретически) вы можете сжать 2 потока на ядро ​​и посмотреть, что из этого выйдет.

1 голос
/ 26 апреля 2010

Обновление: V5 - занято Ожидание во всех потоках (пока кажется оптимальным)

Поскольку все ядра выделены для этой задачи, казалось, что стоит попытаться просто устранить все сложные конструкции синхронизации и выполнить активное ожидание в каждой точке синхронизации во всех потоках. Оказывается, это побеждает все другие подходы с большим отрывом.

Настройка следующая: начните с V4 выше (CyclicBarrier + Busy Wait). Замените CyclicBarrier на AtomicInteger, который основной поток сбрасывает в ноль каждый цикл. Каждый рабочий поток Runnable, который завершает свою работу, увеличивает атомное целое число на единицу. Основной поток занят ждет:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}

Вместо 8 запускаются только 7 рабочих потоков (поскольку все потоки, включая основной, теперь загружают ядро ​​почти полностью). Результаты следующие:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129

Использование ожидания / уведомления в рабочих потоках снижает пропускную способность примерно до 1/3 этого решения.

...