Как закодировать блок мьютекса C#, который зависит от идентификатора объекта? - PullRequest
2 голосов
/ 06 мая 2020

Я ищу шаблон C# для кодирования синхронизированной операции, включая запись в две разные базы данных для конкретного объекта, чтобы я мог избежать условий гонки для одновременных операций над одним и тем же объектом.

Например, Поток 1 и поток 2 одновременно обрабатывают операцию над объектом X. Операция записывает информацию для X в базу данных A (в моем случае - upsert для MongoDB) и базу данных B (вставка в SqlServer). Поток 3 обрабатывает ту же операцию над объектом Y. Желаемое поведение:

  • Поток 1 блокирует поток 2 при обработке записи в A и B для объекта X.
  • Поток 2 ожидает пока поток 1 не завершит запись в A и B, а затем сделает запись в A и B для объекта X.
  • Поток 3 не заблокирован и обрабатывает записи в A и B для объекта Y, пока поток 1 обрабатывает

Я пытаюсь избежать следующего поведения:

  • Поток 1 записывает в A для объекта X.
  • Поток 2 записывает в A для объекта X.
  • Поток 2 записывает в B для объекта X.
  • Поток 1 записывает в B для объекта X.

Я мог бы использовать мьютекс для всех потоков, но я не Я действительно не хочу блокировать операцию для другого объекта.

Ответы [ 2 ]

1 голос
/ 06 мая 2020

Я бы предложил использовать простую блокировку (если она находится в одной области кода), поскольку она будет обрабатывать разные объекты (имеется в виду. net объектов), но с одинаковым значением (поскольку это один и тот же объект), я бы скорее go с некоторой формой кода для сущностей. Если у объекта есть какая-то форма кода, я бы использовал его - например:

Но, конечно, вы должны остерегаться тупиковых ситуаций. А String.Intern сложен, поскольку он стажирует строку на протяжении всего времени работы приложения.

lock(String.Intern(myEntity.Code))
{
   SaveToDatabaseA(myEntity);
   SaveToDatabaseB(myEntity);
}

Но похоже, что вы хотите иметь какой-то механизм репликации. Тогда я бы предпочел сделать это на уровне базы данных (а не на уровне кода)

[ОБНОВЛЕНИЕ]

Вы обновили вопрос, указав информацию, что это выполняется на нескольких серверы. И эта информация здесь очень важна :) Обычная блокировка не работает.

Конечно, вы можете поиграться с синхронизацией блокировок на разных серверах, но это похоже на распределенные транзакции. Теоретически вы можете это сделать, но большинство людей просто избегают этого до тех пор, пока могут, и они играют с архитектурой решения, чтобы упростить процесс.

[ОБНОВЛЕНИЕ 2]

Вы также можете найти это интересным: Распределенная блокировка. NET

:)

1 голос
/ 06 мая 2020

Использование оператора lock является недостаточным для нескольких процессов 1 . Даже именованные / системные семафоры ограничены одной машиной и, следовательно, недостаточны для нескольких серверов.

Если повторная обработка в порядке и можно выбрать «победителя», этого может быть достаточно просто для записи / обновления или использования разновидности optimisti c concurrency . Если необходимо поддерживать более строгие гарантии одновременного выполнения одного процесса, необходимо использовать механизм глобальной блокировки - SQL Сервер поддерживает такой механизм через sp_getapplock .

Аналогичным образом модель может быть обновляется так, что каждый агент «запрашивает» следующую единицу работы, так что диспетчеризация может управляться централизованно и что объект, основанный на ID et c., предоставляется для обработки только одному агенту за раз. Другой вариант - использовать систему обмена сообщениями, например RabbitMQ (или Kafka et c., Fsvo); для RabbitMQ можно даже использовать Consistent Hashing , чтобы гарантировать (по большей части), что разные потребители получают неперекрывающиеся сообщения. Детали различаются в зависимости от используемой реализации.

Из-за разной природы SQL РСУБД и MongoDB (особенно если они используются в качестве «кеша»), этого может быть достаточно для ослабления ограничений и / или дизайна проблема с использованием MongoDB для чтения (это хороший способ использовать кеши). Это может смягчить проблему парной записи, хотя и не предотвращает глобальную одновременную обработку одних и тех же элементов.

1 Даже несмотря на то, что оператора блокировки недостаточно , его можно по-прежнему использовать локально между потоками в одном процессе , чтобы уменьшить локальную конкуренцию и / или минимизировать глобальную блокировку.


Ответ ниже был для исходного вопроса, предполагая одиночный процесс .

«Стандартный» метод предотвращения одновременной работы с одним и тем же объектом через несколько потоков - это оператор блокировки для указанного c объекта . Блокировка устанавливается на самом объекте, так что lock(X) и lock(Y) независимы, когда !ReferenceEquals(X,Y).

Оператор блокировки получает блокировку взаимного исключения для данного объекта , выполняет блок операторов, а затем снимает блокировку. Пока блокировка удерживается, поток, удерживающий блокировку, может снова получить и снять блокировку. Любой другой поток заблокирован для получения блокировки и ждет, пока блокировка не будет снята .

lock (objectBeingSaved) {
  // This code execution is mutually-exclusive over a specific object..
  // ..and independent (non-blocking) over different objects.
  Process(objectBeingSaved);
}

Локальная блокировка процесса не обязательно означает достаточные гарантии для доступ к базам данных или когда доступ распространяется на процессы. Также следует учитывать объем блокировки: например. должен ли он охватывать всю обработку, только сохранение или какую-либо другую рабочую единицу? c видимость объектов явно (и только для) с целью установления блокировки. Это также можно использовать для группировки объектов, которые должны блокироваться друг на друге, если это необходимо.

Также можно использовать пул блокировок, хотя это, как правило, более «продвинутый» вариант использования с только указание c применимость. Использование пулов также позволяет использовать семафоры (в еще более специфичных c случаях использования), а также простую блокировку.

Если требуется блокировка на внешний идентификатор , один подход состоит в интеграции обрабатываемых сущностей с пулом, устанавливая блокировки между сущностями:

// Some lock pool. Variations of the strategy:
// - Weak-value hash table
// - Explicit acquire/release lock
// - Explicit acquire/release from ctor and finalizer (or Dispose)
var locks = CreateLockPool();
// When object is created, assign a lock object
var entity = CreateEntity();
// Returns same lock object (instance) for the given ID, and a different
// lock object (instance) for a different ID.
etity.Lock = GetLock(locks, entity.ID);

lock (entity.Lock) {
  // Mutually exclusive per whatever rules are to select the lock
  Process(entity);
}

Другой вариант - локализованный пул, вместо того, чтобы переносить объект блокировки для каждой сущности. Концептуально это та же модель, что и выше, только перевернутая наружу внутрь. Вот суть. YMMV.

private sealed class Locker { public int Count; }

IDictionary<int, Locker> _locks = new Dictionary<int, Locker>();

void WithLockOnId(int id, Action action) {
  Locker locker;
  lock (_locks) {
     // The _locks might have lots of contention; the work
     // done inside is expected to be FAST in comparison to action().
     if (!_locks.TryGetValue(id, out locker)
        locker = _locks[id] = new Locker();
     ++locker.Count;
  }
  lock (locker) {
     // Runs mutually-exclusive by ID, as established per creation of
     // distinct lock objects.
     action();
  }
  lock (_locks) {
     // Don't forget to take out the garbage..
     // This would be better with try/finally, which is left as an exercise
     // to the reader, along with fixing any other minor errors.
     if (--_locks[id].Count == 0)
       _locks.Remove(id);
  }
}

// And then..
WithLockOnId(x.ID, () => Process(x));

Сделав шаг в сторону, другой подход заключается в «сегментировании» объектов между потоками / процессорами. Таким образом, гарантируется, что каждый поток никогда не будет обрабатывать один и тот же объект, что и другой поток: X, Y, Z всегда go на # 1 и P, D, Q всегда на # 2. (Немного сложнее оптимизировать пропускную способность ..)

var threadIndex = entity.ID % NumThreads;
QueueWorkOnThread(threadIndex, entity); // eg. add to List<ConcurrentQueue>
...