Как распределить ConcurrentDictionary между серверами балансировки нагрузки при использовании концентратора SignalR с Redis - PullRequest
0 голосов
/ 02 января 2019

У меня есть настройка веб-приложения ASP.NET Core с SignalR, уменьшенным с помощью Redis.Использование встроенных групп прекрасно работает:

Clients.Group("Group_Name");

и выдерживает несколько балансировщиков нагрузки.Я предполагаю, что SignalR автоматически сохраняет эти группы в Redis, чтобы все серверы знали, какие у нас есть группы и кто на них подписан.

Однако в моей ситуации я не могу просто полагаться на группы (или пользователей).), так как нет способа сопоставить connectionId (скажем, при перегрузке OnDisconnectedAsync и известен только идентификатор соединения) обратно в его группу, и вам всегда нужно Group_Name для идентификации группы.Мне нужно это, чтобы определить, какая часть группы находится в сети, поэтому, когда вызывается OnDisconnectedAsync, я знаю, к какой группе принадлежит этот парень, и на какой стороне разговора он находится.

Я сделалнекоторые исследования, и все они предложили (включая Microsoft Docs) использовать что-то вроде:

static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

в самом хабе.

Теперь это отличное решение (и поточно-ориентированное), за исключением того, что он существует только в одной из памяти сервера балансировки нагрузки, а другие серверы имеют другой экземпляр этого словаря.

Вопрос в том, нужно ли сохранять connectionMaps вручную?Например, используя Redis?

Что-то вроде:

public class ChatHub : Hub
{
    static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

    ChatHub(IDistributedCache distributedCache)
    {

        connectionMaps = distributedCache.Get("ConnectionMaps");
       /// I think connectionMaps should not be static any more.
    }
}

и, если да, это потокобезопасно?Если нет, можете ли вы предложить лучшее решение, которое работает с балансировкой нагрузки?

1 Ответ

0 голосов
/ 21 января 2019

С этой целью боролись с той же проблемой. Я придумал, чтобы сохранить коллекции в кэше redis, используя StackExchange.Redis.IDatabaseAsync вместе с блокировками для обработки параллелизма. Это, к сожалению, синхронизирует весь процесс, но не может обойти это.

Вот суть того, что я делаю, это блокировка и возврат обратно десериализованной коллекции из кеша


    private async Task<ConcurrentDictionary<int, HubMedia>> GetMediaAttributes(bool requireLock)
        {
            if(requireLock)
            {
                var retryTime = 0;
                try
                {
                    while (!await _redisDatabase.LockTakeAsync(_mediaAttributesLock, _lockValue, _defaultLockDuration))
                    {
                        //wait till we can get a lock on the data, 100ms by default
                        await Task.Delay(100);
                        retryTime += 10;
                        if (retryTime > _defaultLockDuration.TotalMilliseconds)
                        {
                            _logger.LogError("Failed to get Media Attributes");
                            return null;
                        }
                    }
                }
                catch(TaskCanceledException e)
                {
                    _logger.LogError("Failed to take lock within the default 5 second wait time " + e);
                    return null;
                }

            }
            var mediaAttributes = await _redisDatabase.StringGetAsync(MEDIA_ATTRIBUTES_LIST);
            if (!mediaAttributes.HasValue)
            {
                return new ConcurrentDictionary<int, HubMedia>();
            }
            return JsonConvert.DeserializeObject<ConcurrentDictionary<int, HubMedia>>(mediaAttributes);
        }

Обновление коллекции таким образом, как только я закончу с ней манипулировать

        private async Task<bool> UpdateCollection(string redisCollectionKey, object collection, string lockKey)
        {
            var success = false;
            try
            {
                success = await _redisDatabase.StringSetAsync(redisCollectionKey, JsonConvert.SerializeObject(collection, new JsonSerializerSettings
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                }));
            }
            finally
            {
                await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
            }
            return success;
        }

и когда я закончу, я просто обеспечу снятие блокировки для других экземпляров, чтобы захватить и использовать


private async Task ReleaseLock(string lockKey)
        {
            await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
        }

Буду рад услышать, если вы найдете лучший способ сделать это. Изо всех сил пытался найти любую документацию в масштабе с сохранением данных и обменом.

...