У меня есть rabbitListener, который непрерывно асинхронно прослушивает пользовательские сообщения из очереди "user-messages". Все в порядке, пока очередь не загружена массовыми сообщениями. Когда массовые сообщения публикуются в очереди, сообщения того же пользователя обрабатываются первыми, поэтому сообщения других пользователей ожидают своей очереди.
Я не могу использовать Очередь приоритетов , потому что все пользователи имеют одинаковый приоритет. Поэтому я хочу создать новые очереди и прослушивать их в runtime . Все очереди будут недолговечны, как только сообщения будут использованы. (очередь будет удалена)
При просмотре я обнаружил, что очередь может быть создана динамически с помощью RabbitAdmin . Но проблемы заключаются в том, что
- Как заставить слушателя прослушивать новую коротко-живую очередь (TTL), созданную во время выполнения?
- Как заставить слушателя перестать слушать удаленная очередь (по истечении времени TTL), чтобы избежать исключений?
В настоящее время я использую SimpleMessageListenerContainerFactory. У меня нет проблем, чтобы использовать DirectMessageListenerContainer, а также. Мое единственное беспокойство - , как сообщить о создании и удалении динамической c очереди слушателю . Думая о https://www.rabbitmq.com/event-exchange.html (плагин обмена событиями).
Есть ли способ, с помощью которого spring-amqp поддерживает start / stop динамические очереди c. Заранее спасибо.
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(config.getConnectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(3);
return factory;
}
@RabbitListener(id = "listener", queues = {
"#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
MessageHeaders headers) {
//process message
}
[1]: https://www.rabbitmq.com/event-exchange.html