Отсутствуют обновления с блокировками и ConcurrentHashMap - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть сценарий, в котором я должен поддерживать Map, который может быть заполнен несколькими потоками, каждый из которых изменяет свой соответствующий List (уникальный идентификатор / ключ, являющийся именем потока), и когда размер списка для потокапревышает фиксированный размер пакета, мы должны сохранить записи в базе данных.

Класс агрегатора

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}

Через каждые 2 минуты запускается еще один отдельный поток (с использованием той же блокировки)сохранить все записи в Map (чтобы убедиться, что что-то сохраняется через каждые 2 минуты и размер карты не становится слишком большим)

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}

Это решение работает нормально практически для каждого сценариячто мы проверили, за исключением того, что иногда мы видим, что некоторые записи пропали без вести, то есть они вообще не сохраняются, хотя они были добавлены в карту в порядке.

Мои вопросы:

  1. В чем проблема с этим кодом?
  2. Не является ли ConcurrentHashMap лучшим решением здесь?
  3. Есть ли проблема в List, используемом с ConcurrentHashMap?
  4. Должен ли я использовать здесь метод вычисления ConcurrentHashMap (думаю, не нужно, поскольку ReentrantLock уже выполняет ту же работу)?

Ответы [ 3 ]

0 голосов
/ 19 февраля 2019

Похоже, что это была попытка оптимизации там, где она не была нужна.В этом случае чем меньше, тем лучше, и чем проще, тем лучше.В приведенном ниже коде используются только две концепции параллелизма: synchronized для обеспечения правильного обновления общего списка и final для обеспечения того, что все потоки видят одно и то же значение.

import java.util.ArrayList;
import java.util.List;

public class Aggregator<T> implements Runnable {

    private final List<T> instruments = new ArrayList<>();

    private final RecordSaver recordSaver;
    private final int batchSize;


    public Aggregator(RecordSaver recordSaver, int batchSize) {
        super();
        this.recordSaver = recordSaver;
        this.batchSize = batchSize;
    }

    public synchronized void addAll(List<T> moreInstruments) {

        instruments.addAll(moreInstruments);
        if (instruments.size() >= batchSize) {
            storeInstruments();
        }
    }

    public synchronized void storeInstruments() {

        if (instruments.size() > 0) {
            // in case recordSaver works async
            // recordSaver.persist(new ArrayList<T>(instruments));
            // else just:
            recordSaver.persist(instruments);
            instruments.clear();
        }
    }


    @Override
    public void run() {

        while (true) {
            try { Thread.sleep(1L); } catch (Exception ignored) {
                break;
            }
            storeInstruments();
        }
    }


    class RecordSaver {
        void persist(List<?> l) {}
    }

}
0 голосов
/ 02 апреля 2019

Ответ, предоставленный @Slaw в комментариях, сделал свое дело.Мы позволяли экземпляру instrumentList не синхронизироваться, т.е. доступ / операции выполняются по списку без какой-либо синхронизации.Исправив то же самое, передав копию другим методам, добились цели.

Следующая строка кода - та, где эта проблема возникала

recordSaver.persist (instrumentList);instrumentList.clear ();

Здесь мы позволяем экземпляру instrumentList не синхронизироваться, т. е. он передается другому классу (recordSaver.persist), где он былчтобы действовать, но мы также очищаем список в следующей строке (в классе Aggregator), и все это происходит несинхронизированным способом.Состояние списка нельзя предсказать в заставке ... действительно глупая ошибка.

Мы исправили проблему, передав клонированную копию instrumentList в recordSaver.persist (...)метод.Таким образом, instrumentList.clear () не влияет на список, доступный в recordSaver для дальнейших операций.

0 голосов
/ 17 февраля 2019

Я вижу, что вы используете ConcurrentHashMap parallelStream в пределах блокировки.Я не обладаю достаточными знаниями о поддержке потоков Java 8+, но быстрый поиск показывает, что

  1. ConcurrentHashMap - это сложная структура данных, которая в прошлом имела ошибки параллелизма
  2. Параллельные потоки должнысоблюдать сложные и плохо документированные ограничения на использование
  3. Вы изменяете свои данные в параллельном потоке

Основываясь на этой информации (и моих проблемах с параллелизмом, вызванных кишками)детектор ™), я полагаю, что удаление вызова parallelStream может повысить надежность вашего кода.Кроме того, как упомянул @Slaw, вы должны использовать обычный HashMap вместо ConcurrentHashMap, если все использование instrumentMap уже защищено блокировкой.

Конечно, поскольку вы не публикуете код recordSaver возможно, что в нем также есть ошибки (и не обязательно связанные с параллелизмом).В частности, вы должны убедиться, что код, который читает записи из постоянного хранилища - тот, который вы используете для обнаружения потери записей - безопасен, корректен и правильно синхронизирован с остальной частью вашей системы (предпочтительно с помощью надежногостандартная база данных SQL).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...