Очередь заданий в виде таблицы SQL с несколькими потребителями (PostgreSQL) - PullRequest
33 голосов
/ 28 июня 2011

У меня типичная проблема производитель-потребитель:

Несколько приложений-производителей записывают запросы заданий в таблицу заданий в базе данных PostgreSQL.

В запросах заданий есть поле состояния, которое начинается с QUEUED при создании.

Существует несколько пользовательских приложений, которые уведомляются правилом, когда производитель добавляет новую запись:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

Они попытаются зарезервировать новую запись, установив для нее состояние RESERVED. Конечно, только на потребителя должно получиться. Все остальные потребители не должны иметь возможность зарезервировать ту же запись. Вместо этого они должны зарезервировать другие записи с состоянием = QUEUED.

Пример: какой-то производитель добавил следующие записи в таблицу jobrecord :

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

теперь два потребителя A , B хотят обработать их. Они начинают работать одновременно. Один должен зарезервировать идентификатор 1, другой должен зарезервировать идентификатор 2, затем первый, кто заканчивает, должен зарезервировать идентификатор 3 и т. Д.

В чистом многопоточном мире я бы использовал мьютекс для управления доступом к очереди заданий, но потребители - это разные процессы, которые могут выполняться на разных машинах. Они имеют доступ только к одной и той же базе данных, поэтому вся синхронизация должна осуществляться через базу данных.

Я прочитал много документации о параллельном доступе и блокировке в PostgreSQL, например. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Выбрать разблокированную строку в Postgresql PostgreSQL и блокировка

Из этих тем я узнал, что следующий SQL-оператор должен делать то, что мне нужно:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

К сожалению, когда я запускаю это в нескольких пользовательских процессах, примерно в 50% времени они по-прежнему резервируют одну и ту же запись, обрабатывая и перезаписывая изменения другой.

Что мне не хватает? Как мне написать оператор SQL, чтобы несколько потребителей не зарезервировали одну и ту же запись?

Ответы [ 7 ]

32 голосов
/ 15 июля 2011

Я использую postgres для очереди FIFO. Первоначально я использовал ACCESS EXCLUSIVE, который дает правильные результаты при высоком параллелизме, но имеет неприятный эффект взаимоисключения с pg_dump, который получает блокировку ACCESS SHARE во время его выполнения. Это заставляет мою функцию next () блокироваться в течение очень долгого времени (продолжительность pg_dump). Это было неприемлемо, так как мы работаем круглосуточно, и покупателям не нравилось мертвое время в очереди посреди ночи.

Я полагал, что должна быть менее ограничительная блокировка, которая все еще была бы безопасна для одновременной работы и не блокировалась во время работы pg_dump. Мой поиск привел меня к такому сообщению.

Тогда я провел небольшое исследование.

Следующие режимы достаточны для функции NEXT () очереди FIFO, которая будет обновлять статус задания с в очереди до при выполнении без какого-либо сбоя параллелизма, а также не блокировать pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Запрос:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Результат выглядит так:

UPDATE 1
 job_id
--------
     98
(1 row)

Вот сценарий оболочки, который проверяет все различные режимы блокировки при высоком параллелизме (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Код также здесь, если вы хотите редактировать: https://gist.github.com/1083936

Я обновляю свое приложение, чтобы использовать режим EXCLUSIVE, так как это наиболее строгий режим, который а) является правильным и б) не конфликтует с pg_dump. Я выбрал самое ограничительное, так как оно кажется наименее рискованным с точки зрения изменения приложения с ACCESS EXCLUSIVE, не будучи супер-экспертом в блокировке postgres.

Я чувствую себя довольно комфортно с моей испытательной установкой и общими идеями, стоящими за ответом. Я надеюсь, что разделение этого поможет решить эту проблему для других.

15 голосов
/ 19 мая 2015

Для этого не нужно делать целую блокировку таблицы: \.

Блокировка строки, созданная с помощью for update, работает отлично.

См. https://gist.github.com/mackross/a49b72ad8d24f7cefc32 для измененияЯ сделал ответ Апинштейну и убедился, что он все еще работает.

Окончательный код:

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
5 голосов
/ 11 ноября 2016

а как же просто выбрать?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

5 голосов
/ 28 июня 2011

Прочтите мой пост здесь:

Согласованность в postgresql с блокировкой и выберите для обновления

Если вы используете транзакцию и LOCK TABLE, у вас не возникнет проблем.

2 голосов
/ 01 октября 2012

Возможно, вы захотите посмотреть, как это делает queue_classic.https://github.com/ryandotsmith/queue_classic

Код довольно короткий и простой для понимания.

0 голосов
/ 30 июня 2011

Хорошо, вот решение, которое работает для меня, основываясь на ссылке от jordani.Поскольку некоторые из моих проблем заключались в том, как работает Qt-SQL, я включил код Qt:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

Чтобы проверить, обрабатывает ли несколько потребителей одну и ту же работу, я добавил правило и журнал.таблица:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Без оператора LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; таблица журнала иногда заполняется записями, если один потребитель перезаписывает значения другого, но с помощью оператора LOCK таблица журнала остается пустой:-)

0 голосов
/ 28 июня 2011

Проверьте PgQ вместо того, чтобы изобретать велосипед.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...