Обе INSERT
и UPDATE
получают блокировку ROW EXCLUSIVE
, поэтому вы не найдете блокировки на уровне таблицы, исключающей одну, но не другую.
Выможет заблокировать все существующие строки на изменения с SELECT FOR UPDATE
, но это не повлияет одновременно на INSERT
редактируемые записи, поэтому они все равно будут выбраны и обработаны, независимо от того, какие задачи выполняются в данный момент.
Также могут возникнуть проблемы с синхронизацией таблицы entities_for_tasks
с entity_tasks
, в зависимости от того, как именно вы ее заполняете, и какой уровень изоляции вы используете;этот тип паттерна склонен к гоночным условиям при значениях ниже SERIALIZABLE
.
Делая шаг назад, вы действительно решаете две разные задачи: создание и распределение задач и координация выполнения задач,Первая проблема прекрасно решается базовым механизмом очередей, но попытка решить вторую путем перегрузки этого же механизма, по-видимому, является источником всех этих конфликтов.
Итак, оставьте очередь в покое и подумайте очто еще вам действительно нужно для координации выполнения задачи:
- Блокировка, которая говорит "задача выполняется"
- Набор блокировок, которые говорят "задача выполняется против сущности * 1029"* "
... где задаче из block_everything_tasks
требуется эксклюзивная блокировка в (1), в то время как задачи из entity_tasks
могут совместно использовать блокировку в (1) друг с другом, но необходимоисключительная блокировка на (2).
Наиболее явный способ реализовать это через консультативные блокировки , которые позволяют вам "блокировать" произвольные целые числа, которые содержат некоторое значение, специфичное для приложения.
Предполагая, что ни у одного объекта нет идентификатора 0
, давайте использовать его для блокировки верхнего уровня "задача выполняется".Затем, после успешного извлечения задачи из очереди, будет выполняться каждая исключительная задача:
SELECT pg_advisory_xact_lock(0);
... и каждая задача для каждого объекта будет выполняться:
SELECT pg_advisory_xact_lock_shared(0);
SELECT pg_advisory_xact_lock(<entity_id of selected task>);
Основная проблема с консультативной блокировкой состоит в том, что каждый пользователь базы данных должен договориться о том, что означают эти целые числа, или они могут в конечном итоге бороться за одну и ту же блокировку в несвязанных целях.Двухпараметрические (int,int)
перегрузки функций блокировки позволяют ограничивать ваши блокировки конкретным случаем использования, но это не очень помогает, когда ваши идентификаторы bigint
с.
Если вы не увереныВы единственный в своей базе данных, который использует консультативные блокировки, вы можете эмулировать это с помощью табличного подхода.Настройте таблицу:
CREATE_TABLE currently_processing (
entity_id bigint PRIMARY KEY
);
... затем для исключительных задач:
LOCK currently_processing;
... и для задач для отдельных объектов:
INSERT INTO currently_processing VALUES (<entity_id of selected task>);
<run the task>
DELETE FROM currently_processing WHERE entity_id = <entity_id of selected task>;
INSERT
s попытается получить общую блокировку таблицы (заблокированную исключительной задачей), а уникальный индекс PRIMARY KEY
вызовет одновременное блокирование INSERT
s для того же идентификатора до фиксации конфликтующей транзакции.или откатывается.