RabbitMQ DefaultConsumer вызывает слишком много потребительских тегов - PullRequest
0 голосов
/ 19 января 2019

У меня есть клиентское приложение RabbitMQ, которое прослушивает определенную очередь. Клиент создает и экземпляр DefaultConsumer и реализует метод handleDelivery. Вот код

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

Метод receiveMessages () часто вызывается через поток каждые 500 мс и выводит сообщения в другой список для использования. Из-за этого опроса на receiveMessages () я заметил, что потребительские теги непрерывно создаются и растут при просмотре через консоль кролика, как на картинке. Нормально ли видеть эти растущие потребительские теги? enter image description here

Ответы [ 2 ]

0 голосов
/ 19 января 2019

Я наконец нашел рабочее решение. Как подчеркнул Люк Баккен, опрос не требуется. Я просто звоню receiveMesssages() только один раз. После этого мой потребитель получает обратные вызовы, когда сообщения публикуются в очереди.

 protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
 public void receiveMessages() {
    try {
        Message message = new Message();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String response = new String(delivery.getBody(), "UTF-8");
            if (response != null) {
                message.setId(NUID.nextGlobal());
                message.setPayload(response);
                message.setDeliveryTag(deliveryTag);
                messages.add(message);
                logger.info("Message received: ", message.getPayload());
            };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    } catch (Exception e) {
        logger.error("Exception while getting messages from Rabbit ", e);
    }
}

На консоли кролика теперь отображается только 1 запись тега потребления в связанной очереди.

0 голосов
/ 19 января 2019

Нормально ли видеть эти растущие потребительские теги?

Нет, в вашем коде есть ошибка.Вам нужно либо просто использовать долгосрочного потребителя, либо вы должны отменить своего потребителя, когда закончите с ним.

Я не вижу необходимости "опрашивать" receiveMessages - просто дайте ему запуститьсясам по себе, и он будет добавлять сообщения в синхронизированную очередь, как вы ожидаете.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.

...