Spring amqp отклоняет сообщение вне слушателя - PullRequest
0 голосов
/ 22 ноября 2018

Приложение использует java 10, spring amqp и rabbitmq.

В системе есть очередь недоставленных сообщений, куда мы отправляем некоторые сообщения (они не могут быть обработаны должным образом из-за недоступности базы данных).

На данный момент доступность базы данных проверяется каждые X секунд, и, если доступно только, мы перезаписываем сообщения в их исходную очередь.В противном случае мы ничего не делаем, и сообщения остаются в очереди недоставленных сообщений.

При повторной постановке в очередь на исходную очередь сообщения могут снова вернуться в очередь недоставленных сообщений и увидеть увеличение количества заголовков x-death.

По некоторым причинам, мы хотели бы обработать сообщения с недоставленными буквами, например, с числом> = 5 и переместить другие в очередь недоставленных сообщений.

Мне нужно сначала подтвердить сообщениечтобы проверить заголовок счетчика x-death, а затем отправить его в исходную очередь, если счетчик достаточно большой, иначе повторно поместить в очередь недоставленных сообщений.

Мне не удается перенаправить в очередь недоставленных сообщенийпотому что основной вход не в слушателе: бросание AmqpRejectAndDontRequeueException не работает, поскольку исключение не брошено в объекте слушателя rabbitmq.

Я попытался бросить исключение в методе receiveAndCallback, но это кажется не лучше:

rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

        @Override
        public Object handle(Message message) {
            Long messageXdeathCount = null;
            if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
                List<Map<String, ?>> xdeathHeader =
                        (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
                        "x-death");
                if (null != xdeathHeader && null != xdeathHeader.get(0)) {
                    messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
                }
            }
            if (messageXdeathCount == null) {
                messageXdeathCount = 0L;
            }
            if (messageXdeathCount >= 5) {
                resendsMessage(message);
            } else {
                // this does not reject the message
                throw new AmqpRejectAndDontRequeueException("rejected");
            }
            return null;
        }
    });
    return receive;
}

После выполнения этого метода сообщение не отклоняется, как я ожидаю, и находится вне очереди (это hкак было подтверждено).

Вот объявление обмена и очереди:

@Bean
public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    admin().declareExchange(exchange);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    Queue queue = new Queue("queueName", true, false, false, args);
    admin().declareQueue(queue);
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
    admin().declareBinding(binding);
    return exchange;
}

Как можно отклонить сообщения в очереди недоставленных сообщений без использования исключения AmqpRejectAndDontRequeueException?Возможно ли для обмена установить x-dead-letter-exchange на self?

Спасибо за вашу помощь

ОБНОВЛЕНИЕ

Я пыталсяиначе, с каналом get и reject:

// exchange creation
@Bean
public Exchange exchange() throws IOException {
    Connection connection = connectionFactory().createConnection();
    Channel channel = channel();
    channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    channel.queueDeclare("queueName", true, false, false, args);
    channel.queueBind("queueName", EXCHANGE, routingKey);
    return exchange;
}

Message get and ack or reject:

GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
    }
}
if (messageXdeathCount == null) {
    messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps =
            messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    if(response.getProps().getHeaders().get("x-death") == null) {
        response.getProps().getHeaders().put("x-death", new ArrayList<>());
    }
    if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
        ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
    }
    ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
            "count", messageXdeathCount + 1);
    channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}

Сначала я понял, что это было довольно некрасиво, тогда сообщения не могут быть обновлены междуполучить и отклонить.Есть ли способ использовать channel.basicReject и обновить заголовок счетчика x-death?

Ответы [ 2 ]

0 голосов
/ 23 ноября 2018

Я мог бы использовать основные методы канала:

GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = 0L;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        for (Map<String, ?> map : xdeathHeader) {
            Long count = (Long) map.get("count");
            messageXdeathCount += count == null ? 0L : count;
        }
    }
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
}

проблема в части обновления моего вопроса была в последней строке:

channel.basicGet(queueName, true);

логическое значение указывает, должно ли сообщениебыть в очереди или нет: если не в очереди, он переходит к обмену буквами и увеличивает количество заголовков x-death, как и ожидалось.Boolean обновлен до false, исправил проблему.

0 голосов
/ 22 ноября 2018

receiveAndReply() методы в настоящее время не обеспечивают контроль над подтверждением полученного сообщения.Не стесняйтесь открывать Запрос новой функции .

Вместо этого вы можете использовать контейнер слушателя, чтобы получить необходимую гибкость.

РЕДАКТИРОВАТЬ

Вы можете перейти к API rabbitmq ...

rabbitTemplate.execute(channel -> {
    // basicGet, basicPublish, ack/nack etc here
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...