Spring AMQP: прослушивание динамически создаваемой очереди - PullRequest
0 голосов
/ 08 января 2020

У меня есть rabbitListener, который непрерывно асинхронно прослушивает пользовательские сообщения из очереди "user-messages". Все в порядке, пока очередь не загружена массовыми сообщениями. Когда массовые сообщения публикуются в очереди, сообщения того же пользователя обрабатываются первыми, поэтому сообщения других пользователей ожидают своей очереди.

Я не могу использовать Очередь приоритетов , потому что все пользователи имеют одинаковый приоритет. Поэтому я хочу создать новые очереди и прослушивать их в runtime . Все очереди будут недолговечны, как только сообщения будут использованы. (очередь будет удалена)

При просмотре я обнаружил, что очередь может быть создана динамически с помощью RabbitAdmin . Но проблемы заключаются в том, что

  1. Как заставить слушателя прослушивать новую коротко-живую очередь (TTL), созданную во время выполнения?
  2. Как заставить слушателя перестать слушать удаленная очередь (по истечении времени TTL), чтобы избежать исключений?

В настоящее время я использую SimpleMessageListenerContainerFactory. У меня нет проблем, чтобы использовать DirectMessageListenerContainer, а также. Мое единственное беспокойство - , как сообщить о создании и удалении динамической c очереди слушателю . Думая о https://www.rabbitmq.com/event-exchange.html (плагин обмена событиями).

Есть ли способ, с помощью которого spring-amqp поддерживает start / stop динамические очереди c. Заранее спасибо.

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(config.getConnectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(3);
        return factory;
    }

    @RabbitListener(id = "listener", queues = {
            "#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
    public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            MessageHeaders headers) {
         //process message
    }


  [1]: https://www.rabbitmq.com/event-exchange.html
...