Выберите разблокированную строку в Postgresql - PullRequest
41 голосов
/ 23 декабря 2008

Есть ли способ выбрать строки в Postgresql, которые не заблокированы? У меня есть многопоточное приложение, которое будет делать:

Select... order by id desc limit 1 for update

на столе.

Если этот запрос выполняется несколькими потоками, они оба пытаются вернуть одну и ту же строку.

Один получает блокировку строки, другой блокирует, а затем отказывает после того, как первый обновляет строку. Что мне действительно нужно, так это чтобы второй поток получил первую строку, которая соответствует предложению WHERE и еще не заблокирована.

Чтобы уточнить, я хочу, чтобы каждый поток немедленно обновлял первую доступную строку после выполнения выбора.

Таким образом, если есть строки с ID: 1,2,3,4, вступит первый поток, выберите строку с ID=4 и немедленно обновите ее.

Если во время этой транзакции приходит второй поток, я бы хотел получить строку с ID=3 и немедленно обновить эту строку.

Поскольку Share не выполнит этого ни с nowait, так как предложение WHERE будет соответствовать заблокированной строке (ID=4 in my example). В основном я хотел бы что-то вроде «И НЕ БЛОКИРОВАТЬ» в предложении WHERE.

Users

-----------------------------------------
ID        | Name       |      flags
-----------------------------------------
1         |  bob       |        0
2         |  fred      |        1
3         |  tom       |        0
4         |  ed        |        0

Если запрос "Select ID from users where flags = 0 order by ID desc limit 1" и когда возвращается строка, следующая вещь "Update Users set flags = 1 where ID = 0", тогда я бы хотел, чтобы первый поток захватил строку с ID 4, а следующий - для возьмите строку с ID 3.

Если я добавляю «For Update» к выбору, то первый поток получает строку, второй блокирует, а затем ничего не возвращает, потому что как только первая транзакция фиксирует, предложение WHERE больше не выполняется.

Если я не использую "For Update", тогда мне нужно добавить предложение WHERE при последующем обновлении (WHERE flags = 0), чтобы только один поток мог обновить строку.

Второй поток выберет ту же строку, что и первый, но обновление второго потока завершится неудачно.

В любом случае второй поток не может получить строку и обновить ее, потому что я не могу заставить базу данных передать строку 4 первому потоку и строку 3 второму потоку, когда транзакции перекрываются.

Ответы [ 14 ]

26 голосов
/ 11 октября 2014

Эта функция SELECT ... SKIP LOCKED реализуется в Postgres 9.5. http://www.depesz.com/2014/10/10/waiting-for-9-5-implement-skip-locked-for-row-level-locks/

8 голосов
/ 14 июля 2010

Нет Нет, НООО: -)

Я знаю, что имеет в виду автор. У меня похожая ситуация, и я нашел хорошее решение. Сначала я начну с описания моей ситуации. У меня есть таблица, в которой я храню сообщения, которые должны быть отправлены в определенное время. PG не поддерживает синхронизацию выполнения функций, поэтому мы должны использовать демоны (или cron). Я использую пользовательский сценарий, который открывает несколько параллельных процессов. Каждый процесс выбирает набор сообщений, которые должны быть отправлены с точностью +1 сек / -1 сек. Сама таблица динамически обновляется новыми сообщениями.

Таким образом, каждый процесс должен загружать набор строк. Этот набор строк не может быть загружен другим процессом, потому что он создаст много путаницы (некоторые люди получат пару сообщений, когда они должны получить только одно). Вот почему нам нужно заблокировать строки. Запрос на скачивание набора сообщений с блокировкой:

FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE FOR UPDATE LOOP
-- DO SMTH
END LOOP;

процесс с этим запросом запускается каждые 0,5 сек. Таким образом, это приведет к следующему запросу, ожидающему первую блокировку, чтобы разблокировать строки. Этот подход создает огромные задержки. Даже когда мы используем NOWAIT, запрос приведет к исключению, которое нам не нужно, поскольку в таблице могут быть новые сообщения, которые необходимо отправить. Если использовать просто FOR SHARE, запрос будет выполнен правильно, но все же это займет много времени, создавая огромные задержки.

Чтобы заставить это работать, мы делаем немного магии:

  1. изменение запроса:

    FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE AND is_locked(msg_id) IS FALSE FOR SHARE LOOP
    -- DO SMTH
    END LOOP;
    
  2. таинственная функция is_locked (msg_id) выглядит следующим образом:

    CREATE OR REPLACE FUNCTION is_locked(integer) RETURNS BOOLEAN AS $$
    DECLARE
        id integer;
        checkout_id integer;
        is_it boolean;
    BEGIN
        checkout_id := $1;
        is_it := FALSE;
    
        BEGIN
            -- we use FOR UPDATE to attempt a lock and NOWAIT to get the error immediately 
            id := msg_id FROM public.messages WHERE msg_id = checkout_id FOR UPDATE NOWAIT;
            EXCEPTION
                WHEN lock_not_available THEN
                    is_it := TRUE;
        END;
    
        RETURN is_it;
    
    END;
    $$ LANGUAGE 'plpgsql' VOLATILE COST 100;
    

Конечно, мы можем настроить эту функцию для работы с любой таблицей в вашей базе данных. На мой взгляд, лучше создать одну функцию проверки для одной таблицы. Добавление большего количества вещей в эту функцию может сделать ее только медленнее. В любом случае, я проверяю это предложение дольше, поэтому нет необходимости делать его еще медленнее. Для меня это полное решение, и оно отлично работает.

Теперь, когда у меня параллельно работают 50 процессов, у каждого процесса есть уникальный набор свежих сообщений для отправки. После отправки я просто обновляю строку с параметром sent = TRUE и больше никогда не возвращаюсь к ней.

Я надеюсь, что это решение также будет работать для вас (автор). Если у вас есть какие-либо вопросы, просто дайте мне знать: -)

О, и дайте мне знать, если это сработало и у вас.

6 голосов
/ 25 декабря 2010

Я использую что-то вроде этого:

select  *
into l_sms
from sms
where prefix_id = l_prefix_id
    and invoice_id is null
    and pg_try_advisory_lock(sms_id)
order by suffix
limit 1;

и не забудьте вызвать pg_advisory_unlock

4 голосов
/ 25 декабря 2010

Если вы пытаетесь реализовать очередь, взгляните на PGQ, который уже решил эту и другие проблемы. http://wiki.postgresql.org/wiki/PGQ_Tutorial

2 голосов
/ 03 февраля 2009

Похоже, что вы пытаетесь сделать что-то вроде получения элемента с наивысшим приоритетом в очереди, о котором другой процесс еще не позаботился.

Вероятное решение - добавить предложение where, ограничивающее его необработанными запросами:

select * from queue where flag=0 order by id desc for update;
update queue set flag=1 where id=:id;
--if you really want the lock:
select * from queue where id=:id for update;
...

Надеемся, что вторая транзакция заблокируется, пока происходит обновление флага, затем она сможет продолжить, но флаг ограничит ее до следующего в строке.

Также вероятно, что используя сериализуемый уровень изоляции, вы можете получить желаемый результат без всего этого безумия.

В зависимости от характера вашего приложения, могут быть более эффективные способы реализации этого, чем в базе данных, такой как канал FIFO или LIFO. Кроме того, может оказаться возможным отменить порядок, в котором они вам нужны, и использовать последовательность для обеспечения их последовательной обработки.

1 голос
/ 23 декабря 2008

Это может быть выполнено с помощью SELECT ... NOWAIT; Например, здесь .

0 голосов
/ 17 ноября 2011

Используется в многопоточности и кластере?
Как насчет этого?

START TRANSACTION;

// All thread retrive same task list
// If result count is very big, using cursor 
//    or callback interface provied by ORM frameworks.
var ids = SELECT id FROM tableName WHERE k1=v1;

// Each thread get an unlocked recored to process.
for ( id in ids ) {
   var rec = SELECT ... FROM tableName WHERE id =#id# FOR UPDATE NOWAIT;
   if ( rec != null ) {
    ... // do something
   }
}

COMMIT;
0 голосов
/ 11 апреля 2011

Мое решение - использовать оператор UPDATE с предложением RETURNING.

Users

-----------------------------------
ID        | Name       |      flags
-----------------------------------
1         |  bob       |        0  
2         |  fred      |        1  
3         |  tom       |        0   
4         |  ed        |        0   

Вместо SELECT .. FOR UPDATE используйте

BEGIN; 

UPDATE "Users"
SET ...
WHERE ...;
RETURNING ( column list );

COMMIT;

Поскольку инструкция UPDATE получает блокировку ROW EXCLUSIVE для таблицы, при ее обновлении вы получаете сериализованные обновления. Чтения по-прежнему разрешены, но они видят данные только до начала транзакции UPDATE.

Ссылка: Контроль параллелизма Глава Pg docs.

0 голосов
/ 07 декабря 2009

^^ это работает. подумайте о том, чтобы иметь «немедленный» статус «заблокирован».

Допустим, ваш стол такой:

id | имя | фамилия | статус

И возможные состояния, например: 1 = ожидает, 2 = заблокирован, 3 = обработан, 4 = сбой, 5 = отклонен

Каждая новая запись вставляется с ожидающим статус (1)

Ваша программа: «обновляет mytable set status = 2, где id = (выберите id из mytable, где имя, например,«% John% »и status = 1 limit 1), возвращает id, имя, фамилию»

Тогда ваша программа делает свое дело, и если она приходит к выводу, что этот поток вообще не должен был обрабатывать эту строку, она делает: msgstr "обновить статус набора таблиц = 1, где id =?"

В остальном он обновляет другие статусы.

0 голосов
/ 25 июня 2009

Я столкнулся с той же проблемой в нашем приложении и нашел решение, очень похожее на подход Гранта Джонсона. Канал FIFO или LIFO не был возможен, потому что у нас есть кластер серверов приложений, обращающихся к одной БД. То, что мы делаем, это

SELECT ... WHERE FLAG=0 ... FOR UPDATE
, за которым сразу следует
UPDATE ... SET FLAG=1 WHERE ID=:id
как можно скорее, чтобы сохранить время блокировки как можно меньше. В зависимости от количества и размеров столбцов таблицы может быть полезно выбрать идентификатор только при первом выборе и после того, как вы отметили строку, чтобы получить оставшиеся данные. Хранимая процедура может еще больше сократить количество обращений.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...