Устранение Java ошибки видимости потока и параллелизма с помощью вычисления карты - PullRequest
1 голос
/ 24 марта 2020

Я использую Java 8. У меня есть обработчик событий, который принимает события с высокой скоростью (n в секунду), и я хочу передать sh их в хранилище, когда я получу их так много (в этом упрощенный пример 1000)

Есть ли у меня ошибка видимости в строке 25 myCache.get(event.getKey()).add(event.getBean());? Должен ли я синхронизироваться по методу handleEvent()?

public class myClass extends MySimpleEventHanlder {
    private Map<String, List<MyBean>> myCache;
    private ScheduledExecutorService scheduler;

    public void MyClass() {
        myCache = new ConcurrentHashMap<String, List<MyBean>>();
        scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {

            for (Iterator<Map.Entry<String, List<MyBean>>> it = myCache.entrySet().iterator(); it.hasNext();) {
                Map.Entry<String, List<MyBean>> entry = it.next();
                if (entry.getValue().size() >= 1000) {
                    it.remove();
                    //do some more processing , flush to storage
                }
            }
        }, 0, 60, TimeUnit.SECONDS);
    }

    @Override
    public void handleEvent(Event event) {

        if (myCachetCache.containsKey(event.getKey())) {
            myCache.get(event.getKey()).add(event.getBean());
        }
        else{
            List<MyBean> beans = new ArrayList<MyBeans>();
            beans.add(event.getBean());
            myCache.put(event.key, beans);
        }
    }
}

1 Ответ

2 голосов
/ 24 марта 2020

У вас определенно есть проблемы с видимостью: вы добавляете элементы в ArrayList в одном потоке и читаете size () из этого ArrayList в другом потоке без синхронизации между ними.

Другая проблема заключается в том, что ключ может быть удалено между вызовами на myCache.containsKey и myCache.get. Это может вызвать исключение NullPointerException. Это можно решить с помощью compute, который гарантированно будет атомом c.

    myCache.compute(event.getKey(), (key, value) -> {
        if (value == null) {
            value = new ArrayList<>();
        }
        value.add(event.getBean());
        return value;
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...