Java итерация concurrentHashMap - PullRequest
0 голосов
/ 07 мая 2020

Я использую поток (назовем его «MapChecker»), который в течение всего времени существования зацикливается на ConcurrentHashMap.

Карта заполняется из других потоков и очищается MapChecker с помощью итераторов поверх это.

Карта имеет следующую структуру:

private volatile Map<MyObject, SynchronizedList<MyOtherObject>> map = new ConcurrentHashMap<>();
//SynchronizedList = Collections.syncrhonizedList.

MapChecker должен обновить значения каждого ключа внутри своего l oop. Обновления производятся путем удаления элементов из списков или полного удаления записи карты.

Синхронизация происходит в два этапа:

  1. При добавлении данных внутри карты (синхронизировано)
  2. При получении итератора карты (синхронизируется внутри MapChecker).

Блокировки находятся на самой карте (синхронизированы (map)).


Я не позаботьтесь о том, чтобы в моем представлении итератора всегда были последние обновленные значения, но я должен быть уверен, что все недостающие значения будут извлечены в следующих итерациях, это важно, потому что я не хочу пропускать какой-либо элемент. Кроме того, важно также правильно обновить SynchronizedList.

Мой вопрос: могу ли я быть уверенным, что все записи будут вставлены / обновлены с помощью такой архитектуры? Есть риск что-то упустить? Что произойдет, если MapChecker удалит запись, пока другой поток обновляет ту же запись? ConcurrentHashMap должен блокировать эту операцию, поэтому я не ожидаю никаких проблем.


Это MapChecker l oop:

while (!isInterrupted()) {

    executeClearingPhases();

    Iterator<Map.Entry<PoolManager, List<PooledObject>>> it = null;
    synchronized (idleInstancesMap) {
        it = idleInstancesMap.entrySet().iterator();
    }

    while (it.hasNext()) {

        Map.Entry<PoolManager, List<PooledObject>> entry = it.next();
        PoolManager poolManager = entry.getKey();

        boolean stop = false;
        while (!stop) {
            //this list is empty very often but it shouldn't, that's the problem I am facing. I need to assure updates visibility.
            List<PooledObject> idlePooledObjects = entry.getValue();
            if (idlePooledObjects.isEmpty()) {

                stop = true;
            } else {

                PooledObject pooledObject = null;
                try {

                    pooledObject = idlePooledObjects.get(0);
                    info(loggingId, " - REMOOOVINNGG:  \"", pooledObject.getClientId(), "\".");
                    PoolingStatus destroyStatus = poolManager.destroyIfExpired(pooledObject);
                    switch (destroyStatus) {

                        case DESTROY:
                            info(loggingId, " - Removed pooled object \"", pooledObject.getClientId(), "\" from pool: \"", poolManager.getClientId(), "\".");
                            idlePooledObjects.remove(0);
                            break;
                        case IDLE:
                            stop = true;
                            break;
                        default:
                            idlePooledObjects.remove(0);
                            break;
                    }
                } catch (@SuppressWarnings("unused") PoolDestroyedException e) {

                    warn(loggingId, " - WARNING: Pooled object \"", pooledObject.getClientId(), "\" skipped, pool: \"", poolManager.getClientId(), "\" has been destroyed.");
                    synchronized(idleInstancesMap) {

                        it.remove();
                    }
                    stop = true;
                } catch (PoolManagementException e) {

                    error(e, loggingId, " - ERROR: Errors occurred during the operation.");
                    idlePooledObjects.remove(0);
                }
            }
        }
    }
    Thread.yield();
}

Это метод, который вызывается (много раз) любой другой поток:

public void addPooledObject(PoolManager poolManager, PooledObject pooledObject) {

        synchronized (idleInstancesMap) {

            List<PooledObject> idleInstances = idleInstancesMap.get(poolManager);
            if (idleInstances == null) {

                idleInstances = Collections.synchronizedList(new LinkedList<PooledObject>());
                idleInstancesMap.put(poolManager, idleInstances);
            }

            idleInstances.add(pooledObject);
        }
    }

Спасибо

Ответы [ 2 ]

1 голос
/ 07 мая 2020

Благодаря предложению Патрика Чена я переместил список экземпляров PooledObject в каждый PoolManager (который уже владеет этим списком, так как он владеет пулом и его внутренним состоянием полностью синхронизированным образом).

Это результат:

//MapChecker lifecycle
public void run() {

    try {

        while (!isInterrupted()) {

            executeClearingPhases();

            ListIterator<PoolManager> it = null;
            //This really helps. poolManagers is the list of PoolManager instances.
            //It's unlikely that this list will have many elements (maybe not more than 20)
            synchronized (poolManagers) {

                Iterator<PoolManager> originalIt = poolManagers.iterator();

                while (originalIt.hasNext()) {

                    if (originalIt.next().isDestroyed()) {

                        originalIt.remove();
                    }
                }
                //This iterator will contain the current view of the list.
                //It will update on the next iteration.
                it = new LinkedList<PoolManager>(poolManagers).listIterator();
            }

            while (it.hasNext()) {

                PoolManager poolManager = it.next();
                try {
                    //This method will lock on its internal synchronized pool in order to
                    //scan for expired objects.
                    poolManager.destroyExpired();
                } catch (@SuppressWarnings("unused") PoolDestroyedException e) {

                    warn(loggingId, " - WARNING: Pool: \"", poolManager.getClientId(), "\" has been destroyed.");
                    it.remove();
                }
            }
            Thread.yield();
        }
        throw new InterruptedException();
    } catch (@SuppressWarnings("unused") InterruptedException e) {

        started = false;
        Thread.currentThread().interrupt();
        debug(loggingId, " - Pool checker interrupted.");
    }
}
//Method invoked by multiple threads
public void addPooledObject(PoolManager poolManager) {

    synchronized (poolManagers) {

        poolManagers.add(poolManager);
    }
}
0 голосов
/ 07 мая 2020

но мне нужно быть уверенным, что все недостающие значения будут получены в следующих итерациях, это важно, потому что я не хочу пропускать какой-либо элемент.

Во-первых, я думаю, основываясь на своем решении, я думаю, вы получите все элементы на карте, пока продолжаете выполнять MapChecker loop. Я предлагаю вам иметь дополнительное время (true) l oop вне кода MapChecker, который вы указали.

Но, исходя из всего вашего описания, я предлагаю вам использовать Queue вместо Map, очевидно, ваша проблема требует операции push / pop, возможно, BlockingQueue здесь больше подходит.

...