Существует ли шаблон проектирования для архитектуры amqp для обработки сообщений фиксации / отката? - PullRequest
4 голосов
/ 14 октября 2011

У меня есть простой amqp производителя / потребителя, настроенный так:

producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler

Производитель отправляет некоторое количество заданий.Потребитель обрабатывает задания по одному и обрабатывает их, помещая результат в другую очередь.Затем они извлекаются обработчиком result_handler, который публикует результаты в базе данных.

Иногда потребитель терпит неудачу - он может быть убит ОС или вызвать исключение.Если это происходит во время обработки сообщения, то это сообщение теряется, соответствующий результат не выдается, и мне грустно.Я был бы счастлив снова, если невыполненное задание было поставлено в очередь.

Мне нужен шаблон проектирования для обеспечения того, чтобы либо потребитель обработал задание до завершения и поставил соответствующийрезультат в * results_queue *, или в случае неудачи задание возвращается в * jobs_queue *.Поскольку потребитель является причиной сбоя, потребитель не должен отвечать за управление сообщениями, относящимися к его собственному наблюдению.

Мы знаем, что потребитель не удалось обработать задание, если:

  • оно взяло задание из * job_queue * и по истечении некоторого времени ожидания не было получено
  • оно взяло задание из * job_queue *, а затемумер

Для моего приложения мы, вероятно, можем зафиксировать 2-й случай, просто ожидая тайм-аут обработки задания.На производстве будет много рабочих, которые будут контролировать, все они будут извлекать задания из общего списка заданий и помещать результаты в единый обмен / очередь результатов.

1 Ответ

2 голосов
/ 22 ноября 2012

Самый простой способ достичь желаемого - вручную обрабатывать подтверждения для полученных сообщений. В node-amqp это так же просто, как добавление опции { ack: true } к вызову queue.subscribe. Затем вы можете подтвердить сообщения, вызвав некоторую функцию в очереди. В случае node-amqp это queue.shift().

Вы также можете установить количество неподтвержденных сообщений, разрешенных потребителю, используя prefetchCount.

Любые неподтвержденные сообщения теперь будут доставляться (всем подключенным потребителям), если потребитель отключается.

Также установив очередь на durable и autoDelete: false, вы можете дополнительно убедиться, что очередь (и сообщения на ней) не будут удалены при перезапуске вашего MQ-сервера или отключении последнего потребителя.

...