У меня есть две службы: Диспетчер и Сборщик .
- Диспетчер подписан на Очередь
COLLECTED_USER
с routingKey user.collected
и вызывает обработчик UserCollected
. - Сборщик подписывается на очередь
COLLECT_USER
с routingKey user.collect
и вызывает обработчик CollectUser
.
Может быть несколько сборщиков, поэтому я установил exclusive
на false
(см. Код ниже).
Существуют также другие службы, которые прослушивают такие события, как
user.created
, user.updated
, user.deleted
Кроме того, существуют службы, которые прослушивают более общие события, такие как
и т. Д.
Поэтому я использую topic
exchange.
Настройка
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
Что должно произойти:
- Диспетчер публикует сообщение с маршрутизациейKey
user.collect
- Collector получает сообщение
user.collect
и вызывает обработчик CollectUser
- Обработчик
CollectUser
коллектора работает, а затем публикует сообщение с routingKey user.collected
- Диспетчер получает сообщение
user.collected
и вызывает обработчик UserCollected
Что на самом деле происходит:
- Диспетчер публикует сообщение с routingKey
user.collect
(правильно) - Сборщик получает сообщение
user.collect
и вызывает обработчик CollectUser
(правильный) - Менеджер также получает сообщение
user.collect
и вызывает обработчик UserCollected
с неверными данными.(неверно) - Обработчик
CollectUser
коллектора работает, затем публикует сообщение с routingKey user.collected
(правильно) - Менеджер получает
user.collected
и вызывает обработчик UserCollected
(правильный)
Мой вопрос
Почему Manager получает сообщение user.collect
, учитывая:
- Он прослушивает очередь
COLLECTED_USER
, а не COLLECT_USER
, а - Collector , который прослушивает очередь
COLLECT_USER
,сообщение уже обработано.
Подробности реализации
Создаю подписчиков и издателей следующим образом (обрезано по релевантности)
Создание подписчика
учитывая AMQP url
и параметры url
, exchange
, type
, routingKey
, queueName
и handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
Создание издателя
с учетом AMQP url
и параметров url
, exchange
и type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
Publishing
с учетом channel
и параметров exchange
, routingKey
и message
await channel.publish(exchange, routingKey, message)
Примечание
Этот вопрос является продолжением RabbitMQ - Почему мои ключи маршрутизации игнорируются при использовании обмена темами .