[ConcurrentHashMap]: операция обновления / получения карты возвращает устаревшее значение из карты в однопоточной операции. - PullRequest
1 голос
/ 16 июня 2020

Для некоторых key карта все еще имеет устаревшее значение. Новое обновление не отображается для нескольких ключей. Эта ошибка относится к категории not reproducible.

Код:

 class DemoCache{

    private ConcurrentHashMap<String,Demo> demoByName = new ConcurrentHashMap<String,Demo>();
    private ConcurrentHashMap<String,Demo> demoByID = new ConcurrentHashMap<String,Demo>();

    public initializeFromDB(){

        log.info(me + "Initializing/refresh  instrument from database.");
        DemoDbDynamo demoDbDynamo = new DemoDbDynamo();

        final AtomicInteger progressCounter = new AtomicInteger();
        try
        {       
            demoDbDynamo.listAll()
                .stream()
                .peek(i -> progressCounter.incrementAndGet())
                .forEach(this::updateCache);            
        }
        catch (Exception e)
        {
            log.error(me + "Exception fetching demo cache  from table. " + e );
        }
        log.info(me + "count: " + progressCounter.get());

      }

    }

    public void updateCache(Demo demo){

        Demo existing = demoByID.get(demo.getID());

        demoByID.put( dmeo.getID(), demo );
        // this is an updated existing demo
        if(existing != null) {
            //if name have changed
            demoByName.remove(existing.getName());
        }

        demoByName.put      ( demo.getName(),       demo );
        //logging new value -- demo.getName and demo.getValue
        //logging old value - demo.getName and demo.getValue

    }
}

Размер карты: 6k

Вся операция является однопоточной. JMS topi c попадает в этот класс для инициализации карты из БД. У нас есть 4 разных сервера, и на каждом сервере есть этот локальный кеш, который получил refre sh от JMS Topi c msg. Из 4 серверов 3 обновляются со всеми правильными значениями, а 1 сервер все еще сохраняет устаревшие значения для нескольких ключей.

Что может быть причиной root этой проблемы?

обновлений результатов из комментария:

Журналы:

JMS msg

ip-10-0-33-185 20:15:40.374 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} -- DemoCache.onMessage() : [msg=<response mt='5099'/>]

ip-10-0-33-185 20:15:40.375 [ThreadName=ActiveMQ Session Task-72] INFO  cache.DemoCache {} -- DemoCache.initializeFromDB(): Initializing/refresh  cache from database.

ip-10-0-33-185 20:15:45.897 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} -- 
[newObject=[NewID=06926627-e950-48f3-9c53-b679f61120ec newName=foo,newValue=2640.98]]
[OldObject=[oldName=foo,oldValue=2641.05]]

ip-10-0-33-185 20:15:45.913 [ActiveMQ Session Task-72] INFO  cache.DemoCache {} -- DemoCache.initializeFromDB(): count: 5362

Здесь _collector = ip-10-0-33-185, то есть серверный узел. Этот сервер возвращает старое значение = 2641,05 вместо нового значения. Выполняется только один поток ActiveMQ Session Task-72. Других обсуждений не вижу.

1 Ответ

1 голос
/ 16 июня 2020

Несмотря на использование ConcurrentHashMap, код не потокобезопасен .

Вот один из возможных сценариев возникновения несогласованности. Предположим, кеш содержит значение demo0 с ID = "X". Предположим, что есть 2 потока, A и B. Поток A имеет следующую версию объекта ibject с тем же идентификатором, например demo1. Поток B знает это demo1 и тем временем получил его более новую версию, demo2.

Теперь оба потока хотят обновить кеш.

Поток A вызывает updateCache() с значение demo1. После вызова Demo existing = ... и перед вызовом demoByID.put(...) этот поток приостанавливается и выполняется поток B. Поток B вызывает updateCache() со значением demo2, не прерывается и успешно помещает значение demo2 в кеш.

Теперь поток A продолжается. Выполняется demoByID.put(...) et c. Но работает с demo1. Таким образом, он помещает demo1 в кеш, заменяя более новую версию demo2 в demoByID и demoByName.

Что вы можете сделать?

Все операции между проверкой существования и изменение кеша должно выполняться как блок с одним потоком. Например, используйте lock для всего содержимого метода updateCache() или используйте метод declare updateCache() synchronized .

...