Spring AMQP Установить prefetchCount в Queue Bean - PullRequest
2 голосов
/ 27 мая 2020

Как мне установить prefetchCount для этого потребителя очереди?

@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
    Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
            ...
            .build();

    TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);

    amqpAdmin.declareBinding(BindingBuilder
            .bind(queue)
            .to(topicExchange)
            .with(EVENT_ROUTING_KEY));

    return queue;
}

В документации отмечается, что prefetchCount это конфигурация контейнера, но установка ее на моем factory bean не работал, и значение по умолчанию было 250:

    @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(10); // doesn't work; defaults to 250
        return factory;
    }

UPDATE

Согласно комментарию @ GaryRussell ниже, я тестировал значение по умолчанию rabbitListenerContainerFactory а также подтвердил, что моя конфигурация Spring Boot spring.rabbitmq.listener.simple.prefetch использовалась в AbstractRabbitListenerContainerFactoryConfigurer. Однако, когда я смотрю на потребителей очереди в RabbitMQ, я вижу, что очереди, которые я определяю с помощью настройки контейнера по умолчанию, по-прежнему имеют prefetchCount 250:

rabbit admin shows consumer prefetch 250

Я использую админку RabbitMQ как источник истины. Я не думаю, что это ложь, потому что у меня есть куча динамических c очередей, экземпляры которых создаются с помощью настраиваемых контейнеров, и у них действительно есть нестандартные (правильные) prefetchCount s. Я также проверил при запуске контейнера Spring, что существует только один (ожидаемый) компонент rabbitListenerContainerFactory.

1 Ответ

2 голосов
/ 27 мая 2020

Prefetch - это не свойство очереди, это свойство потребителя.

Как выглядит ваш слушатель?

Вы используете нестандартное имя для фабрики контейнеров.

Вам нужно либо добавить свойство containerFactory к @RabbitListener, либо вам нужно переименовать свой bean-компонент в rabbitListenerContainerFactory (переопределив заводскую настройку Boot @Bean)

Также

amqpAdmin.declareBinding(BindingBuilder

Вы не должны разговаривать с брокером в определении bean-компонента - это слишком рано.

Просто добавьте свою очередь, обмен и привязку как @Bean s, и вы также можете просто установить предварительную выборку в файл application.properties/yaml (если вы используете Spring Boot). Администратор найдет их и объявит при первом открытии соединения.

EDIT

Что-то еще происходит ...

@SpringBootApplication
public class So62049769Application {

    public static void main(String[] args) {
        SpringApplication.run(So62049769Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("so62049769");
    }

    @RabbitListener(queues = "so62049769")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.rabbitmq.listener.simple.prefetch=42

enter image description here

...