Приложение использует java 10, spring amqp и rabbitmq.
В системе есть очередь недоставленных сообщений, куда мы отправляем некоторые сообщения (они не могут быть обработаны должным образом из-за недоступности базы данных).
На данный момент доступность базы данных проверяется каждые X секунд, и, если доступно только, мы перезаписываем сообщения в их исходную очередь.В противном случае мы ничего не делаем, и сообщения остаются в очереди недоставленных сообщений.
При повторной постановке в очередь на исходную очередь сообщения могут снова вернуться в очередь недоставленных сообщений и увидеть увеличение количества заголовков x-death.
По некоторым причинам, мы хотели бы обработать сообщения с недоставленными буквами, например, с числом> = 5 и переместить другие в очередь недоставленных сообщений.
Мне нужно сначала подтвердить сообщениечтобы проверить заголовок счетчика x-death, а затем отправить его в исходную очередь, если счетчик достаточно большой, иначе повторно поместить в очередь недоставленных сообщений.
Мне не удается перенаправить в очередь недоставленных сообщенийпотому что основной вход не в слушателе: бросание AmqpRejectAndDontRequeueException не работает, поскольку исключение не брошено в объекте слушателя rabbitmq.
Я попытался бросить исключение в методе receiveAndCallback, но это кажется не лучше:
rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {
@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}
После выполнения этого метода сообщение не отклоняется, как я ожидаю, и находится вне очереди (это hкак было подтверждено).
Вот объявление обмена и очереди:
@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}
Как можно отклонить сообщения в очереди недоставленных сообщений без использования исключения AmqpRejectAndDontRequeueException?Возможно ли для обмена установить x-dead-letter-exchange на self?
Спасибо за вашу помощь
ОБНОВЛЕНИЕ
Я пыталсяиначе, с каналом get и reject:
// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}
Message get and ack or reject:
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}
Сначала я понял, что это было довольно некрасиво, тогда сообщения не могут быть обновлены междуполучить и отклонить.Есть ли способ использовать channel.basicReject и обновить заголовок счетчика x-death?