Блокировка всех строк в таблице PostgreSQL - PullRequest
0 голосов
/ 26 февраля 2019

Я пытаюсь создать своего рода очередь задач в таблице PG, похожую на эту https://www.pgcon.org/2016/schedule/attachments/414_queues-pgcon-2016.pdf, но немного более сложную.

1) Существуют задачи, связанные с определеннойentity_id, и они могут выполняться параллельно, если entity_id для них различны.Для них есть таблица:

create table entity_tasks (
  entity_id bigint,
  task text,
  inserted_at timestamp default now()
);

2) Существуют задачи, которые должны выполняться исключительно, то есть последовательно со всеми остальными задачами.Для задач этого типа также есть таблица:

create table block_everything_tasks (
  task TEXT,
  inserted_at TIMESTAMP DEFAULT NOW()
);

Выполнение задачи с block_everything_tasks должно блокировать выполнение всех задач с entity_tasks и с block_everything_tasks.

После некоторого прототипирования я также добавил таблицу

create table entities_for_tasks (
  entity_id bigint primary key
);

Выборка и выполнение задач для каждой сущности работает так:

begin;
    select entity_id into entity_to_lock
    from entities_for_tasks
    for update skip locked
    limit 1;

    select * from entity_tasks
    where entity_id = entity_to_lock
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

Пока все хорошо, но этостановится неловко, когда я пытаюсь реализовать выборку задач из block_everything_tasks.Я вижу некоторые решения здесь, но не люблю ни одно из них.

1) Я могу явно заблокировать всю таблицу entity_to_lock, например,

begin;
    lock table entity_to_lock;

    select * from block_everything_tasks
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

, но это предотвратит добавление строк в задачи в entity_to_lock и может заблокировать добавление задач в одну изочереди.

2) Или я могу попытаться сделать что-то вроде этого

begin;
    with lock as (
      select * from entity_to_lock for update
    )
    select * from block_everything_tasks
    order by inserted_at
    for update skip locked
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

это выглядит как нормальное решение, я не блокирую отправителей, а entity_to_lock нетв любом случае слишком большой, но я не использую строки из entity_to_lock, и они не заблокированы, поэтому он просто не работает.

Так что мои вопросы

  • есть ли способы заблокировать таблицу entity_to_lock в опции (1), чтобы вставки все еще были возможны, а select * from entity_to_lock where ... for update были заблокированы?
  • Или есть способы заблокировать все строки в опции (2)фактически не потребляя эти строки?
  • Или я должен придумать что-то еще здесь?

1 Ответ

0 голосов
/ 27 февраля 2019

Обе INSERT и UPDATE получают блокировку ROW EXCLUSIVE, поэтому вы не найдете блокировки на уровне таблицы, исключающей одну, но не другую.

Выможет заблокировать все существующие строки на изменения с SELECT FOR UPDATE, но это не повлияет одновременно на INSERT редактируемые записи, поэтому они все равно будут выбраны и обработаны, независимо от того, какие задачи выполняются в данный момент.

Также могут возникнуть проблемы с синхронизацией таблицы entities_for_tasks с entity_tasks, в зависимости от того, как именно вы ее заполняете, и какой уровень изоляции вы используете;этот тип паттерна склонен к гоночным условиям при значениях ниже SERIALIZABLE.


Делая шаг назад, вы действительно решаете две разные задачи: создание и распределение задач и координация выполнения задач,Первая проблема прекрасно решается базовым механизмом очередей, но попытка решить вторую путем перегрузки этого же механизма, по-видимому, является источником всех этих конфликтов.

Итак, оставьте очередь в покое и подумайте очто еще вам действительно нужно для координации выполнения задачи:

  1. Блокировка, которая говорит "задача выполняется"
  2. Набор блокировок, которые говорят "задача выполняется против сущности * 1029"* "

... где задаче из block_everything_tasks требуется эксклюзивная блокировка в (1), в то время как задачи из entity_tasks могут совместно использовать блокировку в (1) друг с другом, но необходимоисключительная блокировка на (2).

Наиболее явный способ реализовать это через консультативные блокировки , которые позволяют вам "блокировать" произвольные целые числа, которые содержат некоторое значение, специфичное для приложения.

Предполагая, что ни у одного объекта нет идентификатора 0, давайте использовать его для блокировки верхнего уровня "задача выполняется".Затем, после успешного извлечения задачи из очереди, будет выполняться каждая исключительная задача:

SELECT pg_advisory_xact_lock(0);

... и каждая задача для каждого объекта будет выполняться:

SELECT pg_advisory_xact_lock_shared(0);
SELECT pg_advisory_xact_lock(<entity_id of selected task>);

Основная проблема с консультативной блокировкой состоит в том, что каждый пользователь базы данных должен договориться о том, что означают эти целые числа, или они могут в конечном итоге бороться за одну и ту же блокировку в несвязанных целях.Двухпараметрические (int,int) перегрузки функций блокировки позволяют ограничивать ваши блокировки конкретным случаем использования, но это не очень помогает, когда ваши идентификаторы bigint с.

Если вы не увереныВы единственный в своей базе данных, который использует консультативные блокировки, вы можете эмулировать это с помощью табличного подхода.Настройте таблицу:

CREATE_TABLE currently_processing (
  entity_id bigint PRIMARY KEY
);

... затем для исключительных задач:

LOCK currently_processing;

... и для задач для отдельных объектов:

INSERT INTO currently_processing VALUES (<entity_id of selected task>);
<run the task>
DELETE FROM currently_processing WHERE entity_id = <entity_id of selected task>;

INSERT s попытается получить общую блокировку таблицы (заблокированную исключительной задачей), а уникальный индекс PRIMARY KEY вызовет одновременное блокирование INSERT s для того же идентификатора до фиксации конфликтующей транзакции.или откатывается.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...