Чтобы сделать процессы взаимоисключающими, вы можете использовать RedLock.Net
.Это Distributed Lock Manager
, как оператор lock
, который работает для процессов, которые не имеют возможности узнать друг друга.Вот пример:
public async Task ProcessMessage(Message message)
{
// the thing we are trying to lock, i.e: "3"
var resource = message.Key;
// determines how long will the lock be alive untill it's automatically released
var expiry = TimeSpan.FromSeconds(30);
// how long will the thread wait trying to acquire the lock
var wait = TimeSpan.FromSeconds(10);
// time span between each request to Redis trying to acquire the lock
var retry = TimeSpan.FromSeconds(1);
// blocks the thread until acquired or 'wait' timeout
using (var redLock = await redlockFactory.CreateLockAsync(resource, expiry, wait, retry))
{
// make sure we got the lock
if (redLock.IsAcquired)
{
// we successfully locked the resource, now other processes will have to wait
ProcessMessageInternal(message.Value);
}
else
{
// could't get the lock within the wait time
// handle collision
}
}
// the lock is automatically released at the end of the using block
// which means the IDisposable.Dispose method makes a request to Redis to release the lock
}
Обратите внимание, как я использую сообщение Key
в качестве ресурса для блокировки.Это означает, что любой другой процесс не сможет заблокировать ресурс до тех пор, пока блокировка не будет устранена или истекла.
Что касается реализации системы pub / sub, я настоятельно рекомендую вам Azure Storage Queue
, создайте Queue Trigger
и подпишите на него свою программу.
Все это звучит сложно, но очень просто реализовать: вы можете разделить потоки вашего приложения на два процесса:
Читатель сообщения: , который просто ставит сообщение в очередь, когда сообщение приходит вот так:
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Create the queue if it doesn't already exist.
queue.CreateIfNotExists();
var message = // get message
var json = SerializeMessage(message);
// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage(json);
queue.AddMessage(message);
Обработчик сообщений: , кто будет подписыватьсяв очередь, используя QueueTrigger
, есть шаблон проекта для Visual Studio, который называется Azure Functions
, который вам просто нужно передать в строку подключения к хранилищу вместе с именем очереди, и он будет обрабатывать параллелизм длявы.Этот процесс будет распространяться горизонтально (это означает, что его будет много), поэтому он должен быть взаимоисключающим со своим братом или сестрой и достигнет этого с помощью RedLock.Net
.Функция Azure будет блокироваться следующим образом:
public class Functions
{
public static void ProcessQueueMessage([QueueTrigger(QueueName)] string serializedMessage)
{
var message = DeserializeMessage(serializedMessage);
MessageProcesor.ProcessMessage(message);
}
}
Вы также можете использовать Service Bus Queue
вместо Azure Storage Queue
, если вам нужно обрабатывать большие сообщения с высокой скоростью.Вот сравнение между ними: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted