У меня есть сценарий, в котором я должен поддерживать Map
, который может быть заполнен несколькими потоками, каждый из которых изменяет свой соответствующий List
(уникальный идентификатор / ключ, являющийся именем потока), и когда размер списка для потокапревышает фиксированный размер пакета, мы должны сохранить записи в базе данных.
Класс агрегатора
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.unlock();
}
}
Через каждые 2 минуты запускается еще один отдельный поток (с использованием той же блокировки)сохранить все записи в Map
(чтобы убедиться, что что-то сохраняется через каждые 2 минуты и размер карты не становится слишком большим)
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().lock();
List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().unlock();
}
}
Это решение работает нормально практически для каждого сценариячто мы проверили, за исключением того, что иногда мы видим, что некоторые записи пропали без вести, то есть они вообще не сохраняются, хотя они были добавлены в карту в порядке.
Мои вопросы:
- В чем проблема с этим кодом?
- Не является ли
ConcurrentHashMap
лучшим решением здесь? - Есть ли проблема в
List
, используемом с ConcurrentHashMap
? - Должен ли я использовать здесь метод вычисления
ConcurrentHashMap
(думаю, не нужно, поскольку ReentrantLock
уже выполняет ту же работу)?