Какая конструкция синхронизации Java, вероятно, обеспечит наилучшее
производительность для параллельного итеративного сценария обработки с
фиксированное количество потоков, как указано ниже? После экспериментов
на некоторое время самостоятельно (используя ExecutorService и CyclicBarrier) и
Буду несколько удивлен результатами, я был бы благодарен за некоторые
совет специалиста и, возможно, некоторые новые идеи. Существующие вопросы здесь делают
не похоже, чтобы сосредоточиться в первую очередь на производительности, следовательно, этот новый.
Заранее спасибо!
Ядром приложения является простой итеративный алгоритм обработки данных,
распараллелено, чтобы распределить вычислительную нагрузку по 8 ядрам на
Mac Pro, работающий под управлением OS X 10.6 и Java 1.6.0_07. Данные для обработки
разбивается на 8 блоков, и каждый блок подается в Runnable для выполнения
одним из фиксированного числа потоков. Распараллеливание алгоритма было
довольно просто, и это функционально работает как хотелось бы, но
его производительность еще не такая, как я думаю. Приложение кажется
тратить много времени на синхронизацию системных вызовов, поэтому через некоторое время
Интересно, выбрал ли я наиболее подходящий
механизм (ы) синхронизации.
Ключевым требованием алгоритма является то, что ему необходимо продолжить
этапы, поэтому потоки должны синхронизироваться в конце каждого этапа.
Основной поток готовит работу (очень низкие накладные расходы), передает ее
потоки, позволяет им работать над этим, затем продолжается, когда все потоки
сделано, переставляет работу (опять же очень низкие накладные расходы) и повторяется
цикл. Машина, посвященная этой задаче, Сборка мусора
минимизируется с помощью пулов для каждого потока предварительно выделенных элементов, и
количество потоков может быть фиксированным (без входящих запросов и т. п.,
только один поток на ядро процессора).
V1 - ExecutorService
Моя первая реализация использовала ExecutorService с 8 работниками
потоки. Программа создает 8 задач, удерживая работу, а затем
позволяет им работать над этим, примерно так:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
Это хорошо работает функционально (оно делает то, что должно), и для
очень большие рабочие элементы действительно все 8 процессоров становятся очень загруженными, как
столько, сколько алгоритм обработки должен был бы позволить (некоторые
рабочие элементы будут заканчиваться быстрее других, затем простаивать). Тем не мение,
как рабочие элементы становятся меньше (и это на самом деле не под
управление программой), пользовательская загрузка ЦП резко уменьшается:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
Условные обозначения:
- размер блока = размер рабочего элемента (= вычислительные шаги)
- система = загрузка системы, как показано в OS X Activity Monitor (красная полоса)
- пользователь = пользовательская загрузка, как показано в OS X Activity Monitor (зеленая полоса)
- циклов / сек = итерации основного цикла while, чем больше, тем лучше
Основной проблемой здесь является высокий процент затраченного времени
в системе, которая, кажется, управляется синхронизацией потоков
звонки. Как и ожидалось, для небольших рабочих элементов, ExecutorService.invokeAll ()
потребует относительно больше усилий для синхронизации потоков
по сравнению с объемом работы, выполняемой в каждом потоке. Но
поскольку ExecutorService является более общим, чем это должно быть
для этого варианта использования (он может ставить задачи для потоков, если есть
больше задач, чем ядер), хотя я, может быть, был бы худее
конструкция синхронизации.
V2 - CyclicBarrier
Следующая реализация использовала CyclicBarrier для синхронизации
темы до получения работы и после ее завершения,
примерно так:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Опять же, это хорошо работает функционально (оно делает то, что должно),
и для очень больших рабочих элементов действительно все 8 процессоров становятся очень
загружается, как и раньше. Однако, когда рабочие элементы становятся меньше,
нагрузка по-прежнему резко уменьшается:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
Для больших рабочих элементов синхронизация незначительна и
производительность идентична V1. Но неожиданно результаты
(узкоспециализированный) CyclicBarrier, кажется, гораздо больше, чем
те для (общего) ExecutorService: пропускная способность (циклов / сек)только около 1/4 от V1. Предварительный вывод будет
что, хотя это кажется рекламируемым идеальным использованием
случай для CyclicBarrier, он работает намного хуже, чем
универсальный ExecutorService.
V3 - Ожидание / Уведомление + CyclicBarrier
Казалось, стоит попытаться заменить первый циклический барьер, ожидающий ()
с простым механизмом ожидания / уведомления:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Опять же, это хорошо работает функционально (оно делает то, что должно).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
Пропускная способность для небольших рабочих элементов все еще намного хуже
из ExecutorService, но примерно в 2 раза больше, чем у CyclicBarrier.
Устранение одного CyclicBarrier устраняет половину разрыва.
V4 - Ожидание занято вместо ожидания / уведомления
Так как это приложение является основным, работающим в системе и
в любом случае ядра простаивают, если они не заняты рабочим элементом,
почему бы не попробовать занятое ожидание рабочих элементов в каждом потоке, даже если
это раскручивает процессор без необходимости. Изменен код рабочего потока
следующим образом:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Также хорошо работает функционально (делает то, что должен).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
Для небольших рабочих элементов это увеличивает пропускную способность еще
10% по сравнению с CyclicBarrier + вариант ожидания / уведомления, который не является
незначительный. Но это все еще намного ниже пропускной способности, чем V1
с ExecutorService.
V5 -?
Итак, каков наилучший механизм синхронизации для такого
(предположительно не редкость) проблема? Я устал писать свои
собственный механизм синхронизации для полной замены ExecutorService
(при условии, что это слишком общий характер и должно быть что-то
это все еще можно вынуть, чтобы сделать его более эффективным).
Это не моя область знаний, и я обеспокоен тем, что
потратить много времени на его отладку (так как я даже не уверен
мои варианты ожидания / уведомления и занятого ожидания верны) для
неопределенное усиление.
Любой совет будет принят с благодарностью.