Согласованность в postgresql с блокировкой и выберите для обновления - PullRequest
2 голосов
/ 27 июня 2011

У меня есть приложение, которое может поддерживать определенное количество одновременных действий. Это представлено таблицей «слотов» в postgres. Когда узлы подключаются, они вставляют в таблицу несколько строк, по одной на слот. Когда задания запрашивают слоты, они обновляют строку в таблице, запрашивая один из слотов, и освобождают ее по окончании.

Таблица слотов выглядит следующим образом:

CREATE TABLE slots (
    id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
    node_name TEXT NOT NULL,
    job_name TEXT
);

В любое время в нем может быть несколько фиксированное количество строк, в каждой из которых может быть указано или нет заполненное задание.

Когда новое задание хочет запустить, оно выполняет следующие запросы, чтобы получить имя узла, на котором оно должно выполняться:

BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    LIMIT 1
    FOR UPDATE;

(имя_узла и идентификатор считываются из курсора)

UPDATE slots
    SET job_name = %(job_name)s
    WHERE id = %(slot_id)s;
COMMIT;

Это часто позволяет запрашивать строки без потери каких-либо обновлений, но с более высоким уровнем параллелизма будет запрашиваться только несколько строк, в то время как было выполнено много запросов SELECT ... FOR UPDATE и UPDATE. В результате мы получаем гораздо больше выполняемых заданий, чем просто мест для них.

Я делаю ошибку блокировки? Есть ли лучший способ сделать это? Что-то, что не использует блокировки таблиц?

Уровень транзакции SERIALIZABLE не обрезает его, заполняется только несколько строк.

Я использую postgresql версии 8.4.

Ответы [ 3 ]

2 голосов
/ 28 июня 2011

Ну, я написал программу на Perl для имитации того, что происходит, так как я не думал, что то, что вы говорите, возможно. Действительно, после запуска симуляции у меня не было никаких проблем, даже когда я отключил блокировку (поскольку SELECT … FOR UPDATE и UPDATE должны выполнить необходимую блокировку).

Я запустил это на PG 8.3 и PG 9.0, и он хорошо работал в обоих местах.

Я призываю вас попробовать программу и / или попробовать версию на Python, чтобы получить хороший жесткий тест-кейс, которым вы можете поделиться с классом. Если это работает, вы можете исследовать, в чем различия, и если это не работает, у вас есть что-то, с чем могут играть другие люди.

#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };

sub worker($)
{
  my ($i) = @_;
  my ($job);

  my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

  my ($x) = 0;
  while(++$x)
  {
#    $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
    my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");

    if ($#id < 0)
    {
      $dbh->rollback;
      sleep(.5);
      next;
    }
    $job = "$$-$i-($x)";
    $dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
    $dbh->commit || die "Cannot commit\n";
    last;
  }
  if (!$job)
  {
    print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
    return;
  }
  else
  {
    print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
  }
  sleep(rand(5));

#  $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
  $dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
  print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
  $dbh->commit || die "Cannot commit";
}

my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;

for(my $i=0;$i<200;$i++)
{
  if (!fork)
  {
    worker($i);
    exit(0);
  }

  if (++$numchild > 50)
  {
    sleep(1);
  }
}
while (wait > 0)
{
  $numchild--;
  print "Waiting numchild $numchild\n";
  sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
  printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
  $sum += $slot->[2];
}
print "Successfully made $sum entries\n";
2 голосов
/ 28 июня 2011
BEGIN; 
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; 
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;

Кажется, это работает в Read Committed. Это только sql (так же, как ваш код) и может быть выполнено за один вызов (быстрее).

@ Сет Робертсон: Без LOCK TABLE и без цикла while небезопасно.

Если есть транзакция A и транзакция B одновременно: A выберет первую строку, а B выберет первую строку. A заблокирует и обновит строку, B придется подождать, пока A не совершит. Затем B перепроверит условие job_name IS NULL. Значение false, и B не будет обновляться - B не выберет следующую строку, а будет только перепроверять и возвращать пустой результат.

@ joegester: ВЫБРАТЬ ДЛЯ ОБНОВЛЕНИЯ не проблема, потому что все таблицы заблокированы.

Может быть, есть другой способ выполнить работу - если вы удаляете и вставляете строки (в другую таблицу?) Вместо установки NULL. Но я не уверен, как.

1 голос
/ 28 июня 2011

Возможно, вы захотите взглянуть на рекомендацию блокировки .

Не тестировали, но может быть возможно переписать ваш запрос блокировки следующим образом:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock('slots'::regclass::int, id::int)
    LIMIT 1;

или, поскольку вы сначала используете bigint (вам нужно столько идентификаторов?!?), Что-то вроде:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock(hashtext('slots_' || id))
    LIMIT 1;

BeОсторожно, если вы это сделаете - консультативная блокировка должна быть явно разблокирована за сеанс независимо от того, успешна ли транзакция или нет.

В этом случае также существует риск коллизиииз hashtext() но это не имеет большого значения для вас, если вы обрабатываете задания ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...