Решение может зависеть от используемой вами абстракции:
Spring Cloud Streams
Я бы сделал это с помощью Spring Cloud StreamПроцессор , который обрабатывает сообщения и устанавливает routingKeyExpression .
Привязки:
=> theSourceQueue
=> myProcessor(message) consumes messages and setts routing key as a header
=> DestinationExchange
=> route 1 => theSourceQueue
=> route 2 => ?
Spring AMQP
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Receiver {
@RabbitListener(queues = "my-messages")
public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
Клиент RabbitMQ Java
Вы также можете перейти на нижний уровень и использовать отрицательное подтверждение и повторную очередь :
В этом примере отклоняются два сообщения содин вызов брокеру (второй аргумент basicNack - это флаг с несколькими флагами):
GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);
Когда сообщение ставится в очередь, оно помещается на свою исходную позицию в своей очереди, если возможно.Если нет (из-за одновременных доставок и подтверждений от других потребителей, когда несколько потребителей совместно используют очередь), сообщение будет помещено в очередь ближе к началу очереди.