Как обрабатывать синхронизированный доступ к списку на карте? - PullRequest
1 голос
/ 09 ноября 2010

ОБНОВЛЕНИЕ: Обратите внимание.
На вопрос, который я задал, был дан ответ.К сожалению для меня, проблема гораздо больше, чем вопрос в заголовке.Помимо добавления новых записей на карту, мне приходилось обрабатывать обновления и удаления одновременно.Сценарий, который я имею в виду, кажется невозможным реализовать без того или другого:
a.тупики б.сложные и трудоемкие проверки и блокировки
Проверьте нижнюю часть вопроса для окончательных мыслей.

ОРИГИНАЛЬНАЯ ПОЧТА:

Привет,

Iу меня есть подпружиненный компонент с картой.

Вот то, для чего я хочу его использовать:

несколько одновременных слушателей JMS будут получать сообщения с действиями.Каждое действие состоит из двух пользователей: long userA и long userB.Сообщение будет иметь собственную очередь String replyTo, которая будет использоваться для идентификации действия.Поскольку я не могу позволить выполнить действие, когда один из пользователей участвует в другом действии, которое выполняется, я собираюсь использовать эту карту в качестве реестра того, что происходит, и для того, чтобы контролировать выполнение действий.Допустим, я получил три действия:
1. userA, userB
2. userB, userC
3. userC, userA

Когда первое действие получено, карта пуста, поэтому ясобираюсь записать информацию о действии в нем и начать выполнение действия.
Когда получено второе действие, я вижу, что пользователь B «занят» первым действием, поэтому я просто записываю информацию о действии.
То же самое длятретье действие.

Карта будет выглядеть следующим образом:
[userA: [action1, action3],
userB: [action1, action2],
userC: [action2, action3]]

Как только первое действие будет завершено, я удалю информацию о нем из реестра и получу информацию о следующих действиях для userA и userB [action3, action2].Затем я попытаюсь перезапустить их.

Я думаю, что теперь вы получаете то, что я хочу сделать с этой картой.

Поскольку к карте будет доступ из нескольких потоков одновременно, яЯ должен как-то обрабатывать синхронизацию.

У меня будут методы для добавления новой информации на карту и удаления информации с карты, когда действие будет выполнено.Метод удаления возвращает следующие действия [если они есть] для двух пользователей, для которых действие только что завершено.

Поскольку могут быть одновременно выполнены сотни действий и процент действий с занятыми пользователями.должен быть низким. Я не хочу блокировать доступ к карте для каждой операции добавления / удаления.

Я думал о создании синхронизированного доступа только к каждому из списков на карте, чтобы обеспечить одновременный доступ к несколькимПользовательские записи одновременно.Однако ... потому что, когда для пользователя не осталось никаких действий, я хочу удалить запись для этого пользователя с карты.Также ... когда у пользователя нет записи на карте, мне придется ее создать.Я немного боюсь, что где-то там могут быть столкновения.

Как лучше всего справиться с этим сценарием?Делает ли оба метода - добавление и удаление - синхронизированным (что я считаю наихудшим сценарием) единственно правильным [безопасным] способом сделать это?

Кроме того, у меня будет еще одна карта, которая будет содержать идентификатор действия в качестве ключей иидентификаторы пользователей в качестве значений, так что проще идентифицировать / удалить пары пользователей.Я полагаю, что могу пропустить синхронизацию на этом, так как не существует сценария, когда одно действие было бы выполнено дважды одновременно.

Хотя код написан на Groovy, я считаю, что ни один Java-программист не столкнется с трудностями при чтении.За этим стоит Java.Пожалуйста, рассмотрите следующий псевдокод, поскольку я просто создаю прототип.

class UserRegistry {

    // ['actionA':[userA, userB]]
    // ['actionB':[userC, userA]]
    // ['actionC':[userB, userC]]
    private Map<String, List<Long>> messages = [:]
    /**
     * ['userA':['actionA', 'actionB'],
     * ['userB':['actionA', 'actionC'],
     * ['userC':['actionB', 'actionC']
     */
    private Map<long, List<String>> users = [:].asSynchronized()

    /**
     * Function will add entries for users and action to the registry.
     * @param userA
     * @param userB
     * @param action
     * @return true if a new entry was added, false if entries for at least one user already existed
     */
    public boolean add(long userA, long userB, String action) {
        boolean userABusy = users.containsKey(userA)
        boolean userBBusy = users.containsKey(userB)
        boolean retValue
        if (userABusy || userBBusy)  {
            if (userABusy) {
                users.get(userA).add(action)
            } else {
                users.put(userA, [action].asSynchronized())
            }
            if (userBBusy) {
                users.get(userB).add(action)
            } else {
                users.put(userB, [action].asSynchronized())
            }
            messages.put(action, [userA, userB])
            retValue = false
        } else {
            users.put(userA, [action].asSynchronized())
            users.put(userB, [action].asSynchronized())
            messages.put(action, [userA, userB])
            retValue = true
        }
        return retValue
    }

    public List remove(String action) {
        if(!messages.containsKey(action)) throw new Exception("we're screwed, I'll figure this out later")
        List nextActions = []
        long userA = messages.get(action).get(0)
        long userB = messages.get(action).get(1)
        if (users.get(userA).size() > 1) {
            users.get(userA).remove(0)
            nextActions.add(users.get(userA).get(0))
        } else {
            users.remove(userA)
        }
        if (users.get(userB).size() > 1) {
            users.get(userB).remove(0)
            nextActions.add(users.get(userB).get(0))
        } else {
            users.remove(userB)
        }
        messages.remove(action)
        return nextActions
    }
}

EDIT Я думал об этом решении вчера вечером, и кажется, что карта сообщений может исчезнуть, и пользователи Map будут:

Map users<String, List<UserRegistryEntry>>

где
UserRegistryEntry :
String actionId
логическое ожидание

теперь давайте предположим, что я получаю следующие действия:

action1: пользователь A, пользователь C
action2: пользователь A, пользователь D
действие 3: пользователь B, пользователь C
action4: пользователь B, пользователь D

Это означает, что действие 1 и действие 4 могут выполняться одновременно, а действия 2 и действие 3 блокируются. Карта будет выглядеть так:

[
[userAId: [actionId: action1, waiting: false],[actionId: action2, waiting: true]],  
[userBId: [actionId: action3, waiting: true], [actionId: action4, waiting: false]],  
[userCId: [actionId: action1, waiting: false],[actionId: action3, waiting: true]],
[userDId: [actionId: action2, waiting: true], [actionId: action4, waiting: false]]
]

Таким образом, когда выполнение действия заканчивается, я удаляю запись с карты, используя:
userAId, userBId, actionId
И возьмите подробности о первом неблокируемом ожидающем действии для userA и userB [если они есть] и передайте их для выполнения.

Итак, теперь мне понадобятся два метода, которые будут записывать данные на карту и удалять их с карты.

public boolean add(long userA, long userB, String action) {
    boolean userAEntryExists = users.containsKey(userA)
    boolean userBEntryExists = users.containsKey(userB)
    boolean actionWaiting = true
    UserRegistryEntry userAEntry = new UserRegistryEntry(actionId: action, waiting: false)
    UserRegistryEntry userBEntry = new UserRegistryEntry(actionId: action, waiting: false)
    if (userAEntryExists || userBEntryExists) {
        if (userAEntryExists) {
            for (entry in users.get(userA)) {
                if (!entry.waiting) {
                    userAEntry.waiting = true
                    userBEntry.waiting = true
                    actionWaiting = true
                    break;
                }
            }
        }

        if (!actionWaiting && userBEntryExists) {
            for (entry in users.get(userB)) {
                if (!entry.waiting) {
                    userAEntry.waiting = true
                    userBEntry.waiting = true
                    actionWaiting = true
                    break;
                }
            }
        }
    }

    if (userBEntryExists) {
        users.get(userA).add(userAEntry)
    } else {
        users.put(userA, [userAEntry])
    }

    if (userAEntryExists) {
        users.get(userB).add(userBEntry)
    } else {
        users.put(userB, [userBEntry])
    }

    return actionWaiting
}

А для удаления:

public List remove(long userA, long userB, String action) {
        List<String> nextActions = []
        finishActionAndReturnNew(userA, action, nextActions)
        finishActionAndReturnNew(userB, action, nextActions)

        return nextActions;
    }

    private def finishActionAndReturnNew(long userA, String action, List<String> nextActions) {
        boolean userRemoved = false
        boolean actionFound = false
        Iterator itA = users.get(userA).iterator()
        while (itA.hasNext()) {
            UserRegistryEntry entry = itA.next()
            if (!userRemoved && entry.actionId == action) {
                itA.remove()
            } else {
                if (!actionFound && isUserFree(entry.otherUser)) {
                    nextActions.add(entry.actionId)
                }
            }
            if (userRemoved && actionFound) break
        }
    }

    public boolean isUserFree(long userId) {
        boolean userFree = true
        if (!users.containsKey(userId)) return true
        for (entry in users.get(userId)) {
            if (!entry.waiting) userFree = false
        }
        return userFree
    }

ЗАКЛЮЧИТЕЛЬНАЯ МЫСЛЬ :
Этот сценарий убийца:
[ActionID, userA, userB]
[a, 1,2]
[b, 1,3]
[c, 3,4]
[д, 3,1]
Действие a и c выполняется одновременно, b и d ожидают.
Когда a и c будут выполнены, записи для пользователей 1, 2, 3, 4 должны быть удалены, поэтому у одного потока будут заблокированы 1 и 2, у другого - 3 и 4 заблокированы. Когда эти пользователи заблокированы, необходимо выполнить проверку следующего действия для каждого из них. Когда код определяет, что для пользователя 1 следующее действие выполняется для пользователя 3, а для пользователя 3 следующее действие выполняется для пользователя 1, сыворотка попытается заблокировать их. Это когда тупик случается. Я знаю, что мог бы обойти это, но, похоже, выполнение займет много времени и заблокирует двух рабочих.
А пока я задам еще один вопрос по SO, подробнее по теме моей проблемы и постараюсь прототипировать решение с использованием JMS.

Ответы [ 3 ]

3 голосов
/ 09 ноября 2010

Возможно, вам придется пересмотреть, как синхронизированные (коллекции) снова работают:

Это (как неисключительный пример) не является потокобезопасным:

if (users.get(userA).size() > 1) {
    users.get(userA).remove(0)

Помните, что только отдельные «синхронизированные» методы гарантированно атомарны без увеличения объема блокировки.

Счастливое кодирование.

Редактирование - блокировки синхронизации для каждого пользователя (обновлено для комментариев):

Просто используя стандартные структуры данных, можно добиться блокировок для каждого ключа с помощью ConcurrentHashMap - в частности, с помощью метода putIfAbsent.(Это значительно отличается , чем просто использование get / put 'синхронизированного HashMap', см. Выше.)

Ниже приведен псевдокод и примечания:

public boolean add(long userA, long userB, String action) {
    // The put-if-absent ensures the *the same* object but may be violated when:
    //   -users is re-assigned
    //   -following approach is violated
    // A new list is created if needed and the current list is returned if
    // it already exists (as per the method name).
    // Since we have synchronized manually here, these lists
    // themselves do not need to be synchronized, provided:
    // Access should consistently be protected across the "higher"
    // structure (per user-entry in the map) when using this approach.
    List listA = users.putIfAbsent(userA, new List)
    List listB = users.putIfAbsent(userB, new List)
    // The locks must be ordered consistently so that
    // a A B/B A deadlock does not occur.
    Object lock1, lock2
    if (userA < userB) {
        lock1 = listA, lock2 = listB
    } else {
        lock1 = listB, lock2 = listA
    }
    synchronized (lock1) { synchronized (lock2) {{ // start locks

    // The rest of the code can be simplified, since the
    // list items are already *guaranteed* to exist there is no
    // need to alternate between add and creating a new list.
    bool eitherUserBusy = listA.length > 0 || listB.length > 0
    listA.add(action)
    listB.add(action)
    // make sure messages allows thread-safe access as well
    messages.put(action, [userA, userB])
    return !eitherUserBusy

    }} // end locks
}

Я не знаю, как это происходит в вашем сценарии использования по сравнению с одним общим объектом блокировки.Часто рекомендуется идти с «проще», если нет явного преимущества, если вы поступите иначе.

HTH и Happy coding.

2 голосов
/ 09 ноября 2010

Возможно, вы захотите проверить Collections.synchronizedMap () или Collections.synchronizedList ()

1 голос
/ 09 ноября 2010

У вас есть два глобальных держателя состояний в классе и составные действия в каждом из двух методов, которые модифицируют оба из них.Таким образом, даже если бы мы изменили Map на ConcurrentHashMap и List на что-то вроде CopyOnWriteArrayList, это все равно не гарантировало бы согласованное состояние.

Я вижу, что вы будете часто писать в список, так что CopyOnWriteArrayList может быть слишком дорогим в любом случае.ConcurrentHashMap является только 16-полосным.Если у вас более качественное оборудование, альтернативой может быть highscalelib Cliff Click (после соответствующей блокировки методов).

Вернемся к вопросу о согласованности, как насчет использования ReentrantLock вместо синхронизации и посмотреть, можете ли вы исключить некоторые операторывне последовательности блокировки () - to-unlock ().Если вы использовали ConcurrentMap, первые два выражения в add (), которые действительно содержат hasKey (), могут быть оптимистичными, и вы сможете исключить их из блока блокировки.

Вам действительно нужна карта сообщений?Это похоже на обратный индекс пользователей.Еще один вариант - использовать другой метод watch (), который периодически обновляет карту сообщений на основе сигнала add () после изменения для пользователей.Обновление может быть полностью асинхронным.При этом вы можете использовать ReadWriteLock с readLock () для пользователей во время обновления сообщений.В этой ситуации add () может безопасно получить writeLock () для пользователей.Это просто еще одна работа, чтобы получить это разумно правильно.

...