Проблема холостого хода каналов RabbitMQ | Как восстановить неупакованные сообщения AMQP | Javaclient потребитель - PullRequest
2 голосов
/ 25 марта 2020

QueueConnectionsChannel inside connection Я являюсь потребителем rabbitmq и использую spring-amqp. Прямо сейчас, когда я go в администраторе, все соединения показывают работоспособность, но каналы внутри них все бездействуют (Prefetch: 250, Unacked: 250). Не могли бы вы помочь? Как правильно использовать эту предварительную выборку? Нужно ли закрывать соединения? Как я могу увеличить каналы на соединение. Прямо сейчас есть только один канал на соединение. Ниже приведен фрагмент конфигурации кода. Я использую, если конфигурация amqp box spring для большинства вещей. Также я использую настраиваемый обработчик сообщений rabbitmq для подтверждения или отмены сообщений.

<!-- RabbitMQ configuration -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhosts}" requested-heartbeat="${rabbitmq.requestedHeartBeat}"/>

    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="rabbitJsonConverter"/>
    channel.Close
    <bean id="rabbitJsonConverter" class="rabbitmq.messages.converter.CustomJackson2JsonMessageConverter">
        <property name="classMapper">
            <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
                <property name="defaultType" value="rabbitmq.messages.custom.dto.CustomRabbitMQMessage"/>
            </bean>
        </property>
    </bean>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" requeue-rejected="true">
        <rabbit:listener queue-names="${rabbitmq.queuename}" ref="customRabbitMQMessageListener" method="onMessage"/>
    </rabbit:listener-container>

<bean id="customRabbitMQMessageListener" class="rabbitmq.messages.listener.CustomRabbitMQMessageListener" >
        <property name="customerAccountService" ref="customerAccountService" />
    </bean>

**Listener Code**
LOG.debug("***** LISTENING RABBITMQ MESSAGES START******");
        channel.basicRecover(true);
        try {
                boolean ack = performOperationsOnMessage(msg);
                if (ack) {
                    channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                } else
                    channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
                LOG.debug("***** LISTENING RABBITMQ MESSAGES FINISHED******");
        } catch (Exception exp) {
            LOG.error("Exception occured during perform Change operation, RabbitMQ message: " + exp.getMessage(), exp);
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
        }

private boolean performOperationsOnMessage(Message msg) {
        RabbitMQMessage message = null;
            try {
                message = (RabbitMQMessage) rabbitJsonConverter.fromMessage(msg);
            } catch (MessageConversionException exp) {
                LOG.warn("Exception occurred during the conversion or any other issue", exp);
                return true;
            }
        if (message == null || message.getOperation() == null || message.getResource() == null || message.getResource().getUuid() == null) {
            LOG.warn("Received an empty message  or emptry operation or empty resource or empty uuid from queue ");
            return true;
        }
        if (message.getOperation().equals(RabbitMQMessage.RossoOperation.remove.name())) {
            return performRemoveOperation(message);
        }
        if (message.getOperation().equals(RabbitMQMessage.RossoOperation.change.name())) {
            return performChangeOperation(message);
        }
        return true;
    }

1 Ответ

2 голосов
/ 28 марта 2020

Таким образом, решение этой проблемы было в коде слушателя, который был настроен для подтверждения вручную. В логи c было несколько веток, из-за которых слушателю не удавалось распознать некоторые сообщения, и именно поэтому количество неупакованных каналов достигло предвыборки (250), в результате чего RabbitMQ прекратил отправку сообщений на каналы.

Исправлено: Как вы видите, обновленный код слушателя в вопросе никогда не оставляет сообщение без подтверждения. Также в отрицательном подтверждении channel.basicNack (msg.getMessageProperties (). GetDeliveryTag (), false, true), Requeue (последняя переменная в сигнатуре) должно быть true, чтобы сообщения могли помещаться в очередь обратно в ту же очередь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...