Java-параллелизм: много писателей, один читатель - PullRequest
17 голосов
/ 29 марта 2010

Мне нужно собрать статистику в моем программном обеспечении, и я пытаюсь сделать это быстро и правильно, что непросто (для меня!)

сначала мой код с двумя классами, StatsService и StatsHarvester

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}

это мой второй класс, харвестер, который время от времени собирает статистику и записывает ее в базу данных.

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

Во время выполнения он будет иметь около 30 одновременно работающих потоков, каждый из которых будет вызывать notify(key) около 100 раз. Только один StatsHarvester вызывает statsService.getStats()

Так что у меня много писателей и только один читатель. было бы неплохо иметь точную статистику, но мне все равно, будут ли потеряны некоторые записи при высоком параллелизме.

Читатель должен запускаться каждые 5 минут или что-либо разумное.

Письмо должно быть максимально быстрым. Чтение должно быть быстрым, но если оно блокируется примерно на 300 мс каждые 5 минут, это нормально.

Я прочитал много документов (параллелизм Java на практике, эффективный Java и т. Д.), Но у меня есть сильное чувство, что мне нужен ваш совет, чтобы сделать это правильно.

Надеюсь, я изложил свою проблему достаточно ясно и кратко, чтобы получить ценную помощь.


EDIT

Спасибо всем за подробные и полезные ответы. Как я и ожидал, есть несколько способов сделать это.

Я протестировал большинство ваших предложений (которые я понял) и загрузил тестовый проект в код Google для дальнейшего использования (проект maven)

http://code.google.com/p/javastats/

Я протестировал различные реализации моего StatsService

  • HashMapStatsService (HMSS)
  • ConcurrentHashMapStatsService (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

и я проверил их с x числом потоков, каждое из которых вызывало уведомление y раз, результаты в мс

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266

В данный момент я думаю, что буду использовать ConcurrentHashMap, поскольку он предлагает хорошую производительность, в то время как его довольно легко понять.

Спасибо за ваш вклад! Janning

Ответы [ 9 ]

16 голосов
/ 29 марта 2010

Поскольку Джек ускользал от вас, вы можете использовать библиотеку java.util.concurrent, которая включает в себя ConcurrentHashMap и AtomicLong. Вы можете поместить AtomicLong, если он отсутствует, вы можете увеличить значение. Поскольку AtomicLong является потокобезопасным, вы сможете увеличивать переменную, не беспокоясь о проблеме параллелизма.

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

Это должно быть как быстро, так и безопасно для потоков

Редактировать: Перефразировано, поэтому не более двух поисков.

8 голосов
/ 29 марта 2010

Почему вы не используете java.util.concurrent.ConcurrentHashMap<K, V>? Он обрабатывает все внутренне, избегая ненужных блокировок на карте и экономя вам много работы: вам не придется заботиться о синхронизации при получении и сдаче ..

Из документации:

Хеш-таблица, поддерживающая полный параллелизм получения и настраиваемый ожидаемый параллелизм для обновлений. Этот класс подчиняется той же функциональной спецификации, что и Hashtable, и включает версии методов, соответствующие каждому методу Hashtable. Однако, несмотря на то, что все операции являются поточно-ориентированными, операции извлечения не влекут за собой блокировку , и нет никакой поддержки для блокировки всей таблицы таким образом, чтобы предотвратить любой доступ.

Вы можете указать уровень параллелизма :

Допустимый параллелизм среди операций обновления определяется необязательным аргументом конструктора concurrencyLevel (по умолчанию 16), который используется в качестве подсказки для внутреннего определения размера . Таблица внутренне разделена, чтобы попытаться разрешить указанное количество одновременных обновлений без конфликта. Поскольку размещение в хеш-таблицах по существу случайное, фактический параллелизм будет отличаться. В идеале вы должны выбрать значение, чтобы вместить столько потоков, сколько когда-либо будет одновременно изменяться таблица . Использование значительно более высокого значения, чем вам нужно, может тратить пространство и время, а значительно более низкое значение может привести к конфликту потоков. Но завышения и недооценки в пределах порядка величины обычно не оказывают заметного влияния. Значение 1 подходит, когда известно, что будет изменен только один поток, а все остальные будут только читать. Кроме того, изменение размера этой или любого другого вида хеш-таблицы является относительно медленной операцией, поэтому, когда это возможно, рекомендуется предоставлять оценки ожидаемых размеров таблиц в конструкторах.

Как предлагается в комментариях, внимательно прочитайте документацию ConcurrentHashMap , особенно когда в нем говорится об атомарных или неатомарных операциях.

Чтобы иметь гарантию атомарности, вы должны учитывать, какие операции являются атомарными, из интерфейса ConcurrentMap вы будете знать, что:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

можно безопасно использовать.

5 голосов
/ 29 марта 2010

Я бы посоветовал взглянуть на библиотеку Java util.concurrent. Я думаю, что вы можете реализовать это решение намного чище. Я не думаю, что вам нужна карта здесь вообще. Я бы порекомендовал реализовать это, используя ConcurrentLinkedQueue . Каждый «продюсер» может свободно писать в эту очередь, не беспокоясь о других. Он может поместить объект в очередь с данными для своей статистики.

Харвестер может использовать очередь, постоянно извлекая данные и обрабатывая их. Затем он может хранить его так, как ему нужно.

4 голосов
/ 29 марта 2010

Ответ Криса Дейла выглядит как хороший подход.

Другой альтернативой может быть одновременное использование Multiset. В библиотеке Google Collections есть одна. Вы можете использовать это следующим образом:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

Глядя на источник , это реализовано с использованием ConcurrentHashMap и использованием putIfAbsent и версии replace с тремя аргументами для обнаружения одновременных изменений и повторных попыток.

3 голосов
/ 29 марта 2010

Другой подход к проблеме состоит в использовании (тривиальной) безопасности потока через ограничение потока. По сути, создайте один фоновый поток, который будет заботиться как о чтении, так и о письме. Обладает довольно хорошими характеристиками с точки зрения масштабируемости и простоты.

Идея состоит в том, что вместо всех потоков, пытающихся обновить данные напрямую, они создают задачу «обновления» для обработки фоновым потоком. Этот же поток также может выполнять задачу чтения, если допустить, что некоторые задержки при обработке обновлений допустимы.

Этот дизайн довольно приятный, потому что потокам больше не придется бороться за блокировку для обновления данных, и поскольку карта ограничена одним потоком, вы можете просто использовать простой HashMap для выполнения get / put и т. Д. С точки зрения реализации, это будет означать создание однопоточного исполнителя и отправку задач записи, которые также могут выполнять необязательную операцию «collectAndSave».

Эскиз кода может выглядеть следующим образом:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

Существует BlockingQueue, связанный с исполнителем, и каждый поток, который создает задачу для StatsService, использует BlockingQueue. Ключевой момент заключается в следующем: длительность блокировки для этой операции должна быть намного короче , чем длительность блокировки в исходном коде, поэтому конкуренция должна быть намного меньше. В целом это должно привести к гораздо лучшей пропускной способности и задержке.

Другое преимущество заключается в том, что, поскольку только один поток читает и записывает на карту, можно использовать обычный HashMap и примитивный тип long (без участия ConcurrentHashMap или атомарных типов). Это также упрощает код, который действительно обрабатывает его.

Надеюсь, это поможет.

1 голос
/ 29 марта 2010

Вы смотрели в ScheduledThreadPoolExecutor? Вы можете использовать это для составления расписания ваших писателей, которые могут писать в параллельную коллекцию, такую ​​как ConcurrentLinkedQueue, упомянутая @Chris Dail. У вас может быть отдельно запланированное задание для чтения из очереди по мере необходимости, и Java SDK должен решить практически все ваши проблемы параллелизма, не требуя ручной блокировки.

0 голосов
/ 30 апреля 2014

Вот как это сделать с минимальным влиянием на производительность измеряемых потоков. Это самое быстрое решение, возможное в Java, без использования специальных аппаратных регистров для подсчета производительности.

Пусть каждый поток выводит свою статистику независимо от других, то есть без синхронизации, на некоторый объект статистики. Сделайте поле, содержащее счетчик изменчивым, чтобы оно было ограждено от памяти:

class Stats
{
   public volatile long count;
}

class SomeRunnable implements Runnable
{
   public void run()
   {
     doStuff();
     stats.count++;
   }
}

Иметь другой поток, который содержит ссылку на все объекты Stats, периодически обходить их все и складывать счетчики во всех потоках:

public long accumulateStats()
{
   long count = previousCount;

   for (Stats stat : allStats)
   {
       count += stat.count;
   }

   long resultDelta = count - previousCount;
   previousCount = count;

   return resultDelta;
}

Этот поток собирателей также нуждается в добавлении сна () (или некоторого другого газа). Например, он может периодически выводить счетчики в секунду на консоль, чтобы дать вам «живое» представление о том, как работает ваше приложение.

Это позволяет избежать излишних затрат на синхронизацию.

Другая хитрость, которую следует рассмотреть, - это заполнение объектов Stats до 128 (или 256 байт в SandyBridge или более поздних версиях), чтобы сохранить количество разных потоков в разных строках кэша, иначе будет возникать конфликт кэширования в ЦП.

Когда только один поток читает и один пишет, вам не нужны блокировки или атомарные операции, достаточно летучих. Все еще будет некоторое противоречие потока, когда поток чтения статистики взаимодействует со строкой кэша ЦП измеряемого потока. Этого нельзя избежать, но это способ сделать это с минимальным воздействием на работающий поток; читать статистику, может быть, раз в секунду или меньше.

0 голосов
/ 22 января 2013

Еще одна альтернатива для реализации обоих методов с использованием ReentranReadWriteLock . Эта реализация защищает от состояния гонки в методе getStats, если вам нужно очистить счетчики. Также он удаляет изменяемый AtomicLong из getStats и использует неизменный Long.

public class StatsService {

    private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000);
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public void  notify(final String key) {
        r.lock();
        AtomicLong count = stats.get(key);
        if (count == null) {
            r.unlock();
            w.lock();
            count = stats.get(key);
            if(count == null) { 
                count = new AtomicLong();
                stats.put(key, count);
            }
            r.lock();
            w.unlock();
        }
        count.incrementAndGet();
        r.unlock();
    }

    public Map<String, Long> getStats() {
        w.lock();

        Map<String, Long> copy = new HashMap<String, Long>();
        for(Entry<String,AtomicLong> entry : stats.entrySet() ){
                copy.put(entry.getKey(), entry.getValue().longValue());
        }
        stats.clear();
        w.unlock();

        return copy;
    }
}

Надеюсь, это поможет, любые комментарии приветствуются!

0 голосов
/ 29 марта 2010

Если мы игнорируем часть сбора урожая и сосредотачиваемся на написании, основное узкое место программы заключается в том, что статистика заблокирована на очень грубом уровне детализации. Если два потока хотят обновить разные ключи, они должны ждать.

Если вы заранее знаете набор ключей и можете предварительно инициализировать карту, чтобы к моменту поступления потока обновления ключ гарантированно существовал, вы могли бы сделать блокировку переменной аккумулятора вместо всей карты. или используйте потокобезопасный объект-аккумулятор.

Вместо того, чтобы реализовывать это самостоятельно, есть реализации карт, которые разработаны специально для параллелизма и выполняют эту более детальную блокировку для вас.

Одно предостережение - это статистика, так как вам нужно было бы получить блокировки всех аккумуляторов примерно в одно и то же время. Если вы используете существующую дружественную к параллелизму карту, возможно, существует конструкция для получения снимка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...