RabbitMQ - Почему неправильный подписчик получает опубликованное сообщение? - PullRequest
0 голосов
/ 23 октября 2018

У меня есть две службы: Диспетчер и Сборщик .

  1. Диспетчер подписан на Очередь COLLECTED_USER с routingKey user.collected и вызывает обработчик UserCollected.
  2. Сборщик подписывается на очередь COLLECT_USER с routingKey user.collect и вызывает обработчик CollectUser.

Может быть несколько сборщиков, поэтому я установил exclusive на false (см. Код ниже).

Существуют также другие службы, которые прослушивают такие события, как

  • user.created,
  • user.updated,
  • user.deleted

Кроме того, существуют службы, которые прослушивают более общие события, такие как

  • #.created
  • user.#

и т. Д.

Поэтому я использую topic exchange.

Настройка

| exchange | type  | routingKey     | queueName      |
| -------- | ----- | -------------- | -------------  |
| MY_APP   | topic | user.collect   | COLLECT_USER   |
| MY_APP   | topic | user.collected | COLLECTED_USER |

Что должно произойти:

  1. Диспетчер публикует сообщение с маршрутизациейKey user.collect
  2. Collector получает сообщение user.collect и вызывает обработчик CollectUser
  3. Обработчик CollectUser коллектора работает, а затем публикует сообщение с routingKey user.collected
  4. Диспетчер получает сообщение user.collected и вызывает обработчик UserCollected

Что на самом деле происходит:

  1. Диспетчер публикует сообщение с routingKey user.collect (правильно)
  2. Сборщик получает сообщение user.collect и вызывает обработчик CollectUser (правильный)
  3. Менеджер также получает сообщение user.collect и вызывает обработчик UserCollected с неверными данными.(неверно)
  4. Обработчик CollectUser коллектора работает, затем публикует сообщение с routingKey user.collected (правильно)
  5. Менеджер получаетuser.collected и вызывает обработчик UserCollected (правильный)

Мой вопрос

Почему Manager получает сообщение user.collect, учитывая:

  1. Он прослушивает очередь COLLECTED_USER, а не COLLECT_USER, а
  2. Collector , который прослушивает очередь COLLECT_USER,сообщение уже обработано.

Подробности реализации

Создаю подписчиков и издателей следующим образом (обрезано по релевантности)

Создание подписчика

учитывая AMQP url и параметры url, exchange, type, routingKey, queueName и handler

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)

Создание издателя

с учетом AMQP url и параметров url, exchange и type

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })

Publishing

с учетом channel и параметров exchange, routingKeyи message

await channel.publish(exchange, routingKey, message)

Примечание

Этот вопрос является продолжением RabbitMQ - Почему мои ключи маршрутизации игнорируются при использовании обмена темами .

1 Ответ

0 голосов
/ 01 ноября 2018

Я наконец понял, в чем была моя проблема.Грязный обмен.Экспериментируя с этим, я непреднамеренно добавил обмен, который направлял сообщения в неправильную очередь, и это вызвало у меня замешательство.

Чтобы исправить это, я запустил графический интерфейс администратора RabbitMQ и удалил все очереди ипозвольте моему коду создать те, которые ему нужны.С кодом, описанным выше, проблем не было.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...