Java: получить + очистить атомарный для карты - PullRequest
0 голосов
/ 08 января 2019

Я хотел бы реализовать следующую логику:

- должна использоваться следующая структура

//Map<String, CopyOnWriteArrayList> keeping the pending updates 
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();

-n производители будут добавлять обновления на карту updatesPerId (для одного и того же идентификатора можно добавить 2 обновления одновременно)

- один TimerThread будет запускаться время от времени и должен обрабатывать полученные обновления. Что-то вроде:

 final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
 updatesPerId.clear();
 // iterate over toBeProcessed and process them

Есть ли способ сделать этот логический поток безопасным без синхронизации логики добавления от производителей и логики timerThread (потребителя)? Я думаю о Atomic Clear + Get, но кажется, что ConcurrentMap не предоставляет что-то подобное. Также я должен упомянуть, что обновления должны храниться с обновленным идентификатором объекта, чтобы я не мог заменить карту очередью или чем-то еще.

Есть идеи? Спасибо!

Ответы [ 3 ]

0 голосов
/ 08 января 2019

Есть ли способ сделать этот логический поток безопасным без синхронизации логики добавления от производителей и логики timerThread (потребителя)?

Короче, нет - в зависимости от того, что вы подразумеваете под "синхронизацией".

Самый простой способ - превратить ваш Map в собственный класс.

class UpdateManager {
    Map<String,List<Update>> updates = new HashMap<>();
    public void add(Update update) {
        synchronized (updates) {
            updates.computeIfAbsent(update.getKey(), k -> new ArrayList<>()).add(update);
        }
    }
    public Map<String,List<Update>> getUpdatesAndClear() {
        synchronized (updates) {
            Map<String,List<Update>> copy = new HashMap<>(updates);
            updates.clear();
            return copy;
        }
    }
}
0 голосов
/ 08 января 2019

Я бы предложил использовать LinkedBlockingQueue вместо CopyOnWriteArrayList в качестве значения карты. С COWAL добавление становится все дороже, поэтому добавление N элементов приводит к производительности N ^ 2. LBQ сложение O (1). Кроме того, LBQ имеет drainTo, который можно эффективно использовать здесь. Вы могли бы сделать это:

final Map<String, Queue<Update>> updatesPerId = new ConcurrentHashMap<>();

Производитель:

updatesPerId.computeIfAbsent(id, LinkedBlockingQueue::new).add(update);

Потребитель:

updatesPerId.forEach((id, queue) -> {
    List<Update> updates = new ArrayList<>();
    queue.drainTo(updates);
    processUpdates(id, updates);
});

Это несколько отличается от того, что вы предложили. Этот метод обрабатывает обновления для каждого идентификатора, но позволяет производителям продолжать добавлять обновления на карту, пока это происходит. Это оставляет записи карты и очереди на карте для каждого идентификатора. Если идентификаторы в конечном итоге будут многократно использоваться, количество записей на карте увеличится до максимальной отметки.

Если новые идентификаторы постоянно появляются, а старые идентификаторы становятся неиспользуемыми, карта будет постоянно расти, что, вероятно, не то, что вам нужно. Если это так, вы можете использовать эту технику в ответе Энди Тернера .

Если потребителю действительно нужно сделать снимок и очистить всю карту, я думаю, что вы должны использовать блокировку, которую вы хотели избежать.

0 голосов
/ 08 января 2019

Вы можете использовать тот факт, что ConcurrentHashMap.compute выполняется атомарно .

Вы можете положить в updatesPerId вот так:

updatesPerId.compute(k, (k, list) -> {
  if (list == null) list = new ArrayList<>();
  // ... add to the list

  // Return a non-null list, so the key/value pair is stored in the map.
  return list;
});

Это , а не с использованием computeIfAbsent и последующим добавлением к возвращаемому значению, которое не будет атомарным.

Тогда в вашей ветке убрать вещи:

for (String key : updatesPerId.keySet()) {
  List<Update> list = updatesPerId.put(key, null);
  updatesPerId.compute(key, (k, list) -> {
    // ... Process the contents of the list.

    // Removes the key/value pair from the map.
    return null;
  });
}

Таким образом, добавление ключа в список (или обработка всех значений для этого ключа) может блокировать, если вы попытаетесь обработать ключ в обоих местах одновременно; в противном случае он не будет заблокирован.


Редактировать: как указано @StuartMarks, возможно, было бы лучше сначала просто извлечь все вещи из карты, а затем обработать их позже, чтобы не блокировать другие потоки, пытающиеся добавить:

Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
  newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...