Использование оператора lock
является недостаточным для нескольких процессов 1 . Даже именованные / системные семафоры ограничены одной машиной и, следовательно, недостаточны для нескольких серверов.
Если повторная обработка в порядке и можно выбрать «победителя», этого может быть достаточно просто для записи / обновления или использования разновидности optimisti c concurrency . Если необходимо поддерживать более строгие гарантии одновременного выполнения одного процесса, необходимо использовать механизм глобальной блокировки - SQL Сервер поддерживает такой механизм через sp_getapplock .
Аналогичным образом модель может быть обновляется так, что каждый агент «запрашивает» следующую единицу работы, так что диспетчеризация может управляться централизованно и что объект, основанный на ID et c., предоставляется для обработки только одному агенту за раз. Другой вариант - использовать систему обмена сообщениями, например RabbitMQ (или Kafka et c., Fsvo); для RabbitMQ можно даже использовать Consistent Hashing , чтобы гарантировать (по большей части), что разные потребители получают неперекрывающиеся сообщения. Детали различаются в зависимости от используемой реализации.
Из-за разной природы SQL РСУБД и MongoDB (особенно если они используются в качестве «кеша»), этого может быть достаточно для ослабления ограничений и / или дизайна проблема с использованием MongoDB для чтения (это хороший способ использовать кеши). Это может смягчить проблему парной записи, хотя и не предотвращает глобальную одновременную обработку одних и тех же элементов.
1 Даже несмотря на то, что оператора блокировки недостаточно , его можно по-прежнему использовать локально между потоками в одном процессе , чтобы уменьшить локальную конкуренцию и / или минимизировать глобальную блокировку.
Ответ ниже был для исходного вопроса, предполагая одиночный процесс .
«Стандартный» метод предотвращения одновременной работы с одним и тем же объектом через несколько потоков - это оператор блокировки для указанного c объекта . Блокировка устанавливается на самом объекте, так что lock(X)
и lock(Y)
независимы, когда !ReferenceEquals(X,Y)
.
Оператор блокировки получает блокировку взаимного исключения для данного объекта , выполняет блок операторов, а затем снимает блокировку. Пока блокировка удерживается, поток, удерживающий блокировку, может снова получить и снять блокировку. Любой другой поток заблокирован для получения блокировки и ждет, пока блокировка не будет снята .
lock (objectBeingSaved) {
// This code execution is mutually-exclusive over a specific object..
// ..and independent (non-blocking) over different objects.
Process(objectBeingSaved);
}
Локальная блокировка процесса не обязательно означает достаточные гарантии для доступ к базам данных или когда доступ распространяется на процессы. Также следует учитывать объем блокировки: например. должен ли он охватывать всю обработку, только сохранение или какую-либо другую рабочую единицу? c видимость объектов явно (и только для) с целью установления блокировки. Это также можно использовать для группировки объектов, которые должны блокироваться друг на друге, если это необходимо.
Также можно использовать пул блокировок, хотя это, как правило, более «продвинутый» вариант использования с только указание c применимость. Использование пулов также позволяет использовать семафоры (в еще более специфичных c случаях использования), а также простую блокировку.
Если требуется блокировка на внешний идентификатор , один подход состоит в интеграции обрабатываемых сущностей с пулом, устанавливая блокировки между сущностями:
// Some lock pool. Variations of the strategy:
// - Weak-value hash table
// - Explicit acquire/release lock
// - Explicit acquire/release from ctor and finalizer (or Dispose)
var locks = CreateLockPool();
// When object is created, assign a lock object
var entity = CreateEntity();
// Returns same lock object (instance) for the given ID, and a different
// lock object (instance) for a different ID.
etity.Lock = GetLock(locks, entity.ID);
lock (entity.Lock) {
// Mutually exclusive per whatever rules are to select the lock
Process(entity);
}
Другой вариант - локализованный пул, вместо того, чтобы переносить объект блокировки для каждой сущности. Концептуально это та же модель, что и выше, только перевернутая наружу внутрь. Вот суть. YMMV.
private sealed class Locker { public int Count; }
IDictionary<int, Locker> _locks = new Dictionary<int, Locker>();
void WithLockOnId(int id, Action action) {
Locker locker;
lock (_locks) {
// The _locks might have lots of contention; the work
// done inside is expected to be FAST in comparison to action().
if (!_locks.TryGetValue(id, out locker)
locker = _locks[id] = new Locker();
++locker.Count;
}
lock (locker) {
// Runs mutually-exclusive by ID, as established per creation of
// distinct lock objects.
action();
}
lock (_locks) {
// Don't forget to take out the garbage..
// This would be better with try/finally, which is left as an exercise
// to the reader, along with fixing any other minor errors.
if (--_locks[id].Count == 0)
_locks.Remove(id);
}
}
// And then..
WithLockOnId(x.ID, () => Process(x));
Сделав шаг в сторону, другой подход заключается в «сегментировании» объектов между потоками / процессорами. Таким образом, гарантируется, что каждый поток никогда не будет обрабатывать один и тот же объект, что и другой поток: X, Y, Z всегда go на # 1 и P, D, Q всегда на # 2. (Немного сложнее оптимизировать пропускную способность ..)
var threadIndex = entity.ID % NumThreads;
QueueWorkOnThread(threadIndex, entity); // eg. add to List<ConcurrentQueue>