Элегантный способ добавить несколько бинов @RabbitListener в ContainerFactory - PullRequest
0 голосов
/ 28 января 2019

Вот мой @Configuration

   @Bean
    public AmqpAdmin amqpAdmin()
    {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        DirectExchange dirExchange = new DirectExchange("evtExchange", true,
                false);

        rabbitAdmin.declareExchange(dirExchange);
        rabbitAdmin.declareQueue(processQueue);
        Binding processBinding = BindingBuilder.bind(processQueue)
                .to(dirExchange).with("rkey.process");
        rabbitAdmin.declareBinding(processBinding);

        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(processQueue);
        factory.setAdviceChain(retryInterceptor());
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor()
    {
        return RetryInterceptorBuilder.stateless().maxAttempts(5)
                .backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
    }

    @Bean
    public ProcessQueueListener processListener()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener2()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener3()
    {
        return new ProcessQueueListener();
    }

А вот @RabbitListener класс

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{

    public ProcessQueueListener()
    {
    }

    @RabbitHandler
    void receiveMessage(String message)
    {
        // doSomething
    }

} 

Только когда я создаю экземпляр processListener(), processListener2() и processListener3()отдельно я начинаю видеть нескольких потребителей в администраторе RabbitMQ для очереди процесса, и каждый слушатель обрабатывает сообщения, в противном случае я просто вижу только одного потребителя, несмотря на указание setConcurrentConsumers()

Существует ли элегантный способобъявлять несколько слушателей по требованию, увеличивать и уменьшать в зависимости от необходимости.Или объявление нескольких @Bean s единственный вариант?Или я что-то не так делаю?

1 Ответ

0 голосов
/ 28 января 2019

Какую версию вы используете?

Я только что скопировал ваш контейнерный завод, и он прекрасно работает для меня (2.1.3) ...

enter image description here

Кстати, начиная с версии 2.0, вы можете добавить concurrency к @RabbitListener, и это заменит любое значение в фабрике контейнеров.

/**
 * Set the concurrency of the listener container for this listener. Overrides the
 * default set by the listener container factory. Maps to the concurrency setting of
 * the container type.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
 * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
 * number of consumers in the {@code concurrentConsumers} property. If it is a string
 * with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
 * and the {@code maxConcurrentConsumers} is set to {@code n}.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
 * DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
 * @return the concurrency.
 * @since 2.0
 */
String concurrency() default "";

Кроме того, не связано, но выне следует делать это rabbitAdmin.declareExchange(dirExchange) в объявлении bean-компонента - слишком рано в жизненном цикле контекста приложения подключиться к RabbitMQ.Добавьте обмен, очередь и привязку как @Bean s, и администратор найдет и объявит их автоматически.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...