Каков наилучший способ реализации таблицы очереди сообщений в MySQL? - PullRequest
29 голосов
/ 08 января 2009

Я, наверное, в десятый раз внедряю что-то подобное, и я никогда не был на 100% доволен решениями, которые я придумал.

Причина использования таблицы mysql вместо «правильной» системы обмена сообщениями привлекательна в первую очередь потому, что большинство приложений уже используют какую-то реляционную базу данных для других вещей (которые, как правило, являются mysql для большинства вещей, которые я делал), в то время как очень немногие приложения используют систему обмена сообщениями. Кроме того, реляционные базы данных имеют очень сильные свойства ACID, в то время как системы обмена сообщениями часто не имеют.

Первая идея заключается в использовании:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

И тогда enqueue выглядит так:

insert into jobs(message) values('blah blah');

И dequeue выглядит так:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Стол и очередь выглядят хорошо, но dequeue как бы беспокоит меня. Какова вероятность отката? Или заблокировать? Какие ключи я должен использовать, чтобы сделать его O (1) -иш?

Или есть ли лучшее решение, чем то, что я делаю?

Ответы [ 7 ]

25 голосов
/ 05 сентября 2011

Ваша очередь может быть более краткой. Вместо того чтобы полагаться на откат транзакции, вы можете сделать это в одном атомарном операторе без явной транзакции:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Затем вы можете выполнять задания с помощью (скобки [] означают необязательные, в зависимости от ваших данных):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
7 голосов
/ 08 января 2009

Я построил несколько систем очередей сообщений, и я не уверен, к какому типу сообщений вы обращаетесь, но в случае удаления из очереди (это слово?) Я сделал то же самое, что и вы сделали. Ваш метод выглядит простым, чистым и солидным. Не то чтобы моя работа была лучшей, но она оказалась очень эффективной для большого мониторинга многих сайтов. (регистрация ошибок, массовые почтовые маркетинговые кампании, уведомления в социальных сетях)

Мой голос: не беспокойтесь!

6 голосов
/ 08 января 2009

Брайан Акер некоторое время назад говорил о механизме очереди . Говорили и о синтаксисе SELECT table FROM DELETE.

Если вас не беспокоит пропускная способность, вы всегда можете использовать SELECT GET_LOCK () в качестве мьютекса. Например:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

А если вы хотите по-настоящему придумать, оберните его в хранимую процедуру.

1 голос
/ 15 апреля 2016

Вот решение, которое я использовал, работая без process_id текущего потока или блокируя таблицу.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Получите результат в массиве $ row и выполните:

DELETE from jobs WHERE ID=$row['ID'];

Затем получите затронутые строки (mysql_affered_rows). Если есть затронутые строки, обработайте задание в массиве $ row. Если затронуто 0 строк, это означает, что какой-то другой процесс уже обрабатывает выбранное задание. Повторяйте вышеупомянутые шаги, пока не будет строк.

Я проверил это с таблицей 'jobs', имеющей 100 тыс. Строк и порождающей 20 параллельных процессов, которые выполняют вышеописанное. Никаких условий гонки не произошло. Вы можете изменить вышеупомянутые запросы, чтобы обновить строку с флагом обработки и удалить строку после того, как вы фактически обработали ее:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Само собой разумеется, вы должны использовать правильную очередь сообщений (ActiveMQ, RabbitMQ и т. Д.) Для такого рода работы. Нам пришлось прибегнуть к этому решению, так как наш хост регулярно ломает вещи при обновлении программного обеспечения, поэтому чем меньше вещей ломать, тем лучше.

1 голос
/ 08 января 2009

Этот поток содержит информацию о дизайне, которая должна быть отображаемой.

Цитировать:

Вот что я успешно использовал в прошлом:

Схема таблицы MsgQueue

Идентификатор MsgId - НЕ NULL
MsgTypeCode varchar (20) - НЕ NULL
SourceCode varchar (20) - процесс вставки сообщения - NULLable
State char (1) - «Новый, если поставлен в очередь», «A» (ctive), если обрабатывается, «C» завершен, по умолчанию «N» - NOT NULL
CreateTime datetime - значение по умолчанию GETDATE () - НЕ NULL
Msg varchar (255) - NULLable

Ваши типы сообщений - это то, что вы ожидаете - сообщения, которые соответствуют контракту между вставкой процесса (-ов) и чтением процесса (-ов), структурированы с помощью XML или другого вашего выбора представления (JSON может пригодиться в некоторых случаях). случаи, например).

Тогда процессы 0-к-n могут быть вставлены, а процессы 0-к-n могут считывать и обрабатывать сообщения. Каждый процесс чтения обычно обрабатывает один тип сообщения. Для балансировки нагрузки может быть запущено несколько экземпляров типа процесса.

Читатель извлекает одно сообщение и меняет состояние на «А», пока работает над ним. Когда это сделано, он меняет состояние на «C». Он может удалить сообщение или нет в зависимости от того, хотите ли вы сохранить контрольный журнал. Сообщения состояния = 'N' извлекаются в порядке MsgType / Timestamp, поэтому есть индекс для MsgType + State + CreateTime.

Варианты:
Состояние для "E" rror.
Столбец для кода процесса Reader.
Отметки времени для переходов между состояниями.

Это обеспечило хороший, масштабируемый, видимый, простой механизм для выполнения ряда вещей, которые вы описываете. Если у вас есть общее представление о базах данных, это довольно надежно и расширяемо. Никогда не было проблем с откатом блокировок и т. Д. Из-за транзакций перехода атомарного состояния.

1 голос
/ 08 января 2009

Я бы предложил использовать Quartz.NET

У него есть поставщики для SQL Server, Oracle, MySql, SQLite и Firebird.

0 голосов
/ 28 марта 2019

Вы можете иметь промежуточную таблицу для поддержания смещения для очереди.

create table scan(
  scan_id int primary key,
  offset_id int
);

Возможно, у вас также есть несколько сканирований, следовательно, одно смещение на сканирование. Инициализируйте offset_id = 0 в начале сканирования.

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

Все, что вам нужно сделать, это просто сохранить последнее смещение. Это также сэкономит вам много места (process_id для каждой записи). Надеюсь, это звучит логично.

...