SpringAMQP - повтор / повторная отправка сообщений dlx - PullRequest
1 голос
/ 14 января 2020

Я пытаюсь использовать механизм повтора с использованием DLX. Итак, в основном я хочу отправить сообщение 3 раза, а затем остановить и оставить это сообщение остановленным в очереди dlx;

Что я сделал: создал WorkQueue, привязанный к WorkExchange, создал RetryQueue, привязанный к RetryExchange

WorkQueue -> установить x-dead-letter-exchange на RetryExchange

RetryQueue -> установить x-dead-letter-exchange на WorkExchange И x-message-ttl на 300000 мс (5 минут)

Итак, теперь, когда я отправляю какое-либо сообщение в WorkQueue, и оно терпит неудачу ... это сообщение отправляется в RetryQueue в течение 5 минут, а затем возвращается в WorkQueue ... но оно может продолжать отказывать, и я хотел бы остановить его после 3 попыток ...

Это возможно? Возможно ли установить RetryQueue попробовать 3 раза и после остановки?

спасибо.

1 Ответ

1 голос
/ 14 января 2020

Невозможно сделать это только в брокере.

Вы можете добавить код в свой слушатель - проверьте заголовок x-death, чтобы определить, сколько раз сообщение было повторено, и отбросьте / зарегистрируйте его. (и / или отправить его в третью очередь) в вашем слушателе, когда вы хотите отказаться.

РЕДАКТИРОВАТЬ

@SpringBootApplication
public class So59741067Application {

    public static void main(String[] args) {
        SpringApplication.run(So59741067Application.class, args);
    }

    @Bean
    public Queue main() {
        return QueueBuilder.durable("mainQueue")
                .deadLetterExchange("")
                .deadLetterRoutingKey("dlQueue")
                .build();
    }

    @Bean
    public Queue dlq() {
        return QueueBuilder.durable("dlQueue")
                .deadLetterExchange("")
                .deadLetterRoutingKey("mainQueue")
                .ttl(5_000)
                .build();
    }

    @RabbitListener(queues = "mainQueue")
    public void listen(String in,
            @Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {

        System.out.println(in + xDeath);
        if (xDeath != null && (long) xDeath.get(0).get("count") > 2L) {
            System.out.println("Given up on this one");
        }
        else {
            throw new AmqpRejectAndDontRequeueException("test");
        }
    }

}
...