Использование очереди в зависимости от количества потребителей в Spring AMQP - PullRequest
2 голосов
/ 27 мая 2019

Я хочу, чтобы очередь использовалась только одним подписчиком за раз.Поэтому, если один подписчик удалится, у другого будет возможность подписаться.

Я ищу правильный способ сделать это в Spring AMQP.Я сделал это на чистой Java, основываясь на примере на сайте RabbitMQ.Я пассивно объявляю очередь, проверяю количество ее потребителей, если оно равно 0, затем начинаю потреблять ее.

Вот код.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();

System.out.println("count is "+count);
if (count == 0) {
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
    System.out.println("subscribed by some other processor(s)");
}

Я также могу проверить количество подписчиков веснойAMQP таким образом.Но уже слишком поздно, потому что он уже прослушивает очередь.

@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
    try {
        int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
        // do something.
    } catch (IOException e) {
        System.out.println("exception occurred");
    }
}

В двух словах, я хочу использовать очередь, основываясь на количестве ее потребителей.Я надеюсь, что я ясен.

1 Ответ

2 голосов
/ 28 мая 2019

Установите флаг exclusive на @RabbitListener;RabbitMQ позволит использовать только один экземпляр.Другие экземпляры будут пытаться прослушивать каждые 5 секунд (по умолчанию).Чтобы увеличить интервал, установите для фабрики контейнеров recoveryBackOff.

@SpringBootApplication
public class So56319999Application {

    public static void main(String[] args) {
        SpringApplication.run(So56319999Application.class, args);
    }

    @RabbitListener(queues = "so56319999", exclusive = true)
    public void listen (String in) {

    }

    @Bean
    public Queue queue() {
        return new Queue("so56319999");
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...