Параллелизм Java в пут / получить в коллекциях - PullRequest
4 голосов
/ 29 ноября 2011

Я подключаюсь к внешней службе с интерактивным сеансом + частный канал (InputStream), который работает в отдельных потоках. В интерактивном сеансе я отправляю исходящие сообщения и получаю синхронные ответы с объектом, содержащим разные поля, одно из которых является идентификатором, а «состояние» подтверждает успех или неудачу. Одновременно я получаю сообщения в приватной ленте для этого идентификатора с последующими обновлениями статуса. В настоящее время я храню информацию о состоянии каждого идентификатора в ConcurrentHashMap. Крайне важно, чтобы я держал правильную последовательность событий на этих объектах, но в настоящее время я получаю условия гонки, в которых я иногда обрабатываю и обновляю объекты в приватном фиде, прежде чем получу и обработаю синхронный ответ на интерактивном сеансе, поэтому у меня остается устаревший и неправильный статус для идентификатора.

В идеале мне бы хотелось иметь некоторый тип коллекции с методом PutIfKeyExistOrWait (w timeout), который обновлял бы значение, только если ключ существует, или ждал, что я мог бы использовать при обработке объектов в закрытом фиде.

Кто-нибудь знает, есть ли подходящая коллекция, или может предложить альтернативное решение моей проблемы? Спасибо.

Ответы [ 3 ]

2 голосов
/ 29 ноября 2011

Вы можете попытаться инкапсулировать логику для обработки этой ситуации в значения вашей карты, что-то вроде этого:

  • Если поток подачи первым добавляет значение для определенного идентификатора, это значение считаетсяне завершено, и поток ожидает его завершения
  • Если интерактивный поток сеанса не первым добавляет значение, он отмечает это неполное значение как завершенное
  • При получении их из неполных значений они считаются отсутствующимикарта

Это решение основано на атомарности putIfAbsent().

public class StatusMap {
    private Map<Long, StatusHolder> map = new ConcurrentHashMap<Long, StatusHolder>();

    public Status getStatus(long id) {
        StatusHolder holder = map.get(id);
        if (holder == null || holder.isIncomplete()) {
            return null;
        } else {
            return holder.getStatus();
        }
    }

    public void newStatusFromInteractiveSession(long id, Status status) {
        StatusHolder holder = StatusHolder.newComplete(status);
        if ((holder = map.putIfAbsent(id, holder)) != null) {
            holder.makeComplete(status); // Holder already exists, complete it
        } 
    }

    public void newStatusFromFeed(long id, Status status) {
        StatusHolder incomplete = StatusHolder.newIncomplete();
        StatusHolder holder = null;
        if ((holder = map.putIfAbsent(id, incomplete)) == null) {
            holder = incomplete; // New holder added, wait for its completion
            holder.waitForCompletion();
        }
        holder.updateStatus(status);
    }
}

public class StatusHolder {
    private volatile Status status;
    private volatile boolean incomplete;
    private Object lock = new Object();

    private StatusHolder(Status status, boolean incomplete) { ... }

    public static StatusHolder newComplete(Status status) {
        return new StatusHolder(status, false);
    }

    public static StatusHolder newIncomplete() {
        return new StatusHolder(null, true);
    }

    public boolean isIncomplete() { return incomplete; }

    public void makeComplete(Status status) {
        synchronized (lock) {
            this.status = status;
            incomplete = false;
            lock.notifyAll();
        }
    }

    public void waitForCompletion() {
        synchronized (lock) {
            while (incomplete) lock.wait();
        }
    }
    ...
}
0 голосов
/ 29 ноября 2011

У вас уже есть ConcurrentHashMap iDAndStatus, в котором хранится идентификатор и последний статус.Однако я бы только позволил потоку, который имеет дело со службой, создать новую запись в этой карте.

Когда сообщение приходит из канала, если идентификатор уже существует в iDAndStatus, он просто изменяет статус,Если ключ не существует, просто временно сохраните обновления идентификатора / статуса в некоторой другой структуре данных, pendingFeedUpdates.

Каждый раз, когда в iDAndStatus создается новая запись, проверяйте pendingFeedUpdates, чтобы увидеть, присутствуют ли какие-либо обновления для нового идентификатора.

Я не уверен, какую синхронизированную структуру данных использовать для pendingFeedUpdates: вам нужно получить по идентификатору, но вы можете иметь много сообщений для каждого идентификатора, и вы хотите сохранить порядок сообщений.Может быть, синхронизированный HashMap, который связывает каждый идентификатор с каким-либо типом синхронизированной упорядоченной очереди?

0 голосов
/ 29 ноября 2011

Я бы посоветовал вам взглянуть на коллекцию Collections.getSynchronized: http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Collections.html#synchronizedList%28java.util.List%29

Возможно, это решит проблему с другим вариантом, в зависимости от того, как выполняются вызовы, у метода должен быть синхронизированный метод, обеспечивающий безопасное выполнение потока иобеспечит атомарность сделки.См. http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html

Третий вариант заключается в применении контроля управления параллелизмом в приложении, следуя оптимистическому или пессимистическому подходу в зависимости от того, чего вы пытаетесь достичь.Это самый сложный из 3, но даст вам больший контроль, если сочетать его с предыдущими опциями.

Это действительно зависит от вашей конкретной реализации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...