Одновременное чтение / запись с / на Redis Set - один сервер, несколько клиентов - PullRequest
0 голосов
/ 23 сентября 2018

У нас есть несколько приложений, работающих на разных компьютерах.Сервер Redis должен содержать очередь общего ключа / значения для всех приложений.Каждое приложение имеет 2 потока, один для заполнения очереди и другой для итерации и обработки очереди.

Предположим, что очередь содержит следующие элементы: [(1, value1), (2, v2), (3, v3), (4, v4)].Нам нужно, чтобы элемент с ключом 3 просматривался только одним клиентом, а элемент одновременного просмотра запроса - с ключом 4 или любым другим ключом.

  • Каков наилучший способ реализовать это с помощью Redis?
  • Есть ли способ достичь цели с помощью строки SET?
  • Может ли система паб / подсистема быть реализована для этого с Redis?

Спасибозаранее.

ПРИМЕЧАНИЕ Клиенты, написанные на C #, StackExchange.Redis

Ответы [ 2 ]

0 голосов
/ 24 сентября 2018

Вы можете использовать LIST для сохранения этих предметов и взять LIST в качестве очереди.Используйте команды push / pop , чтобы заполнить очередь и извлечь элементы из очереди.

LIST гарантирует одновременное чтение / запись, а элемент может быть выбран только одним клиентом.

0 голосов
/ 24 сентября 2018

Чтобы сделать процессы взаимоисключающими, вы можете использовать RedLock.Net.Это Distributed Lock Manager, как оператор lock, который работает для процессов, которые не имеют возможности узнать друг друга.Вот пример:

public async Task ProcessMessage(Message message) 
{   
    // the thing we are trying to lock, i.e: "3"
    var resource = message.Key; 

    // determines how long will the lock be alive untill it's automatically released
    var expiry = TimeSpan.FromSeconds(30);

    // how long will the thread wait trying to acquire the lock
    var wait = TimeSpan.FromSeconds(10);

    // time span between each request to Redis trying to acquire the lock
    var retry = TimeSpan.FromSeconds(1);

    // blocks the thread until acquired or 'wait' timeout
    using (var redLock = await redlockFactory.CreateLockAsync(resource, expiry, wait, retry))
    {
        // make sure we got the lock
        if (redLock.IsAcquired)
        {
            // we successfully locked the resource, now other processes will have to wait
            ProcessMessageInternal(message.Value);
        }
        else 
        {
            // could't get the lock within the wait time
            // handle collision
        }
    }

    // the lock is automatically released at the end of the using block
    // which means the IDisposable.Dispose method makes a request to Redis to release the lock
}

Обратите внимание, как я использую сообщение Key в качестве ресурса для блокировки.Это означает, что любой другой процесс не сможет заблокировать ресурс до тех пор, пока блокировка не будет устранена или истекла.

Что касается реализации системы pub / sub, я настоятельно рекомендую вам Azure Storage Queue, создайте Queue Trigger и подпишите на него свою программу.

Все это звучит сложно, но очень просто реализовать: вы можете разделить потоки вашего приложения на два процесса:

Читатель сообщения: , который просто ставит сообщение в очередь, когда сообщение приходит вот так:

// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
    CloudConfigurationManager.GetSetting("StorageConnectionString"));

// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();

// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");

// Create the queue if it doesn't already exist.
queue.CreateIfNotExists();

var message = // get message

var json = SerializeMessage(message);

// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage(json);
queue.AddMessage(message);

Обработчик сообщений: , кто будет подписыватьсяв очередь, используя QueueTrigger, есть шаблон проекта для Visual Studio, который называется Azure Functions, который вам просто нужно передать в строку подключения к хранилищу вместе с именем очереди, и он будет обрабатывать параллелизм длявы.Этот процесс будет распространяться горизонтально (это означает, что его будет много), поэтому он должен быть взаимоисключающим со своим братом или сестрой и достигнет этого с помощью RedLock.Net.Функция Azure будет блокироваться следующим образом:

public class Functions 
{
    public static void ProcessQueueMessage([QueueTrigger(QueueName)] string serializedMessage)
    {
        var message = DeserializeMessage(serializedMessage);

        MessageProcesor.ProcessMessage(message);
    }
}

Вы также можете использовать Service Bus Queue вместо Azure Storage Queue, если вам нужно обрабатывать большие сообщения с высокой скоростью.Вот сравнение между ними: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted

...