Параллельная генерация восходящего идентификатора без блокировки - PullRequest
0 голосов
/ 22 сентября 2018

У меня есть карта, которая должна связывать строки с идентификатором.Должны быть не пробелы между идентификаторами, и они должны быть уникальными целыми числами от 0 до N.

Запрос всегда приходит с двумя строками, одна из которых, обе или ни одна из них, возможно, уже проиндексированы.Карта строится параллельно из пула ForkJoin, и в идеале я бы хотел избежать явных синхронизированных блоков.Я ищу оптимальный способ максимизировать пропускную способность с блокировкой или без нее.

Я не вижу, как использовать AtomicInteger без создания последовательных пропусков для ключей, которые уже присутствовали на карте.

public class Foo {
    private final Map<String, Integer> idGenerator = new ConcurrentHashMap<>();

    // invoked from multiple threads
    public void update(String key1, String key2) {
      idGenerator.dosomething(key, ?) // should save the key and unique id
      idGenerator.dosomething(key2, ?) // should save the key2 and its unique id
      Bar bar = new Bar(idGenerator.get(key), idGenerator.get(key2));
      // ... do something with bar
   }
}

Я думаю, что метод size() в сочетании с merge() может решить проблему, но я не могу убедить себя в этом.Может ли кто-нибудь предложить подход к этой проблеме?

РЕДАКТИРОВАТЬ

Что касается дублирующего флага, это не может быть решено с помощью AtomicInteger.incrementAndGet(), как предлагается в связанном ответе.Если бы я делал это вслепую для каждой строки, в последовательностях было бы пробелов .Необходима операция составная , которая проверяет, существует ли ключ, и только затем генерирует идентификатор.Я искал способ реализовать такую ​​сложную операцию через Map API.

Второй предоставленный ответ идет вразрез с требованиями, которые я специально изложил в вопросе.

Ответы [ 2 ]

0 голосов
/ 22 сентября 2018

Я не уверен, что вы можете делать именно то, что вы хотите.Тем не менее, вы можете пакетировать некоторые обновления или выполнять проверку отдельно от перечисления / добавления.

Многие из этого ответа предполагают, что порядок не важен: вам нужны все строки с указанным числом, но переупорядочениедаже в паре нормально, верно?Параллелизм может уже привести к переупорядочению пар или к тому, что члены пары не получат смежные числа, но переупорядочение может привести к тому, что первая пара получит большее число.

задержка не так важна.Это приложение должно жевать большое количество данных и в конечном итоге производить вывод.Большую часть времени на карте должен быть результат поиска

Если большинство запросов выполняет поиск, нам в основном нужна пропускная способность чтения на карте.

Может быть достаточно одного потока записи.

Таким образом, вместо непосредственного добавления в основную карту параллельные читатели могут проверить свои входные данные, а если их нет, добавить их в очередь для перечисления и добавить в основной ConcurrentHashMap. Очередь может представлять собой простую очередь без блокировки или другую ConCurrentHashMap, чтобы также отфильтровывать дубликаты из еще не добавленных кандидатов.Но, вероятно, очередь без блокировки хороша.

Тогда вам не нужен атомный счетчик, или у вас возникнут проблемы с двумя потоками, увеличивающими счетчик дважды, когда они видят одну и ту же строку, прежде чем любой из них сможет добавить ее вкарта.(Потому что в противном случае это большая проблема.)

Если у писателя есть способ заблокировать ConcurrentHashMap, чтобы сделать пакет обновлений более эффективным, это может быть хорошо.Но если ожидается, что частота попаданий будет достаточно высокой, вы действительно хотите, чтобы другие потоки читателей продолжали фильтровать дубликаты в максимально возможной степени, пока мы увеличиваем их, вместо того, чтобы приостанавливать это.


Чтобы уменьшить конфликт междув основных интерфейсных потоках у вас может быть несколько очередей, например, каждый поток имеет очередь одного производителя / одного потребителя или группа из четырех потоков, работающих на паре физических ядер, совместно использует одну очередь.

Поток перечисления читает от всех из них.

В очереди, где читатели не борются с авторами, поток перечисления не имеет конкуренции.Но множественные очереди уменьшают конфликт между авторами.(Потоки, записывающие эти очереди, являются потоками, которые обращаются к основному доступу только для чтения ConcurrentHashMap, где большая часть процессорного времени будет расходоваться при высоких показателях совпадений.)


Какой-то read-copy-update (RCU) структура данных может быть хорошей, если у Java есть это .Это позволило бы читателям продолжать отфильтровывать дубликаты на полной скорости, в то время как поток перечисления создает новую таблицу с выполнением пакета вставок с нулевым конфликтом при создании новой таблицы.


С помощью 90% попаданий, один поток записи может поддерживать не более 10 потоков чтения, которые фильтруют новые ключи по основной таблице.

Возможно, вы захотите установить некоторое ограничение размера очереди, чтобы учесть обратное давление со стороныодин автор темы.Или, если у вас гораздо больше ядер / потоков, чем может выдержать один писатель, когда может быть полезен какой-то параллельный набор, позволяющий нескольким потокам удалять дубликаты до нумерации.

Или действительно, если вы можете простоподождите до конца, чтобы все пронумеровать, я думаю, это будет намного проще.

Я думал о том, что, возможно, попытаюсь подсчитать, оставив место для ошибки в условиях гонки, и затем вернуться, чтобы исправить ситуацию, но этонаверное не лучше.

0 голосов
/ 22 сентября 2018

Нет способа сделать это именно так, как вы хотите - ConcurrentHashMap сам по себе не блокируется.Однако вы можете сделать это атомарно, без необходимости какого-либо явного управления блокировками, используя функцию java.util.Map.computeIfAbsent .

Вот пример кода в стиле того, что вы предоставилиэто должно помочь вам.

ConcurrentHashMap<String, Integer> keyMap = new ConcurrentHashMap<>();
AtomicInteger sequence = new AtomicInteger();

public void update(String key1, String key2) {
    Integer id1 = keyMap.computeIfAbsent(key1, s -> sequence.getAndIncrement());
    Integer id2 = keyMap.computeIfAbsent(key2, s -> sequence.getAndIncrement());

    Bar bar = new Bar(id1, id2);
    // ... do something with bar
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...