Как Ack / Nack с реактивным RabbitListener в Spring AMQP? - PullRequest
1 голос
/ 21 мая 2019

Я использую Spring AMQP 2.1.6 для потребления сообщений с RabbitListener, возвращающим Mono<Void>.Например:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

Чтение документации говорит:

Фабрика контейнера слушателя должна быть сконфигурирована с AcknowledgeMode.MANUAL, чтобы поток потребителя былне подтверждать сообщение;вместо этого при завершении асинхронной операции асинхронное завершение будет подтверждать или сбивать сообщение.

Таким образом, я сконфигурировал фабрику контейнеров с AcknowledgeMode.MANUAL, но мне неясно, "асинхронное завершение"будет подтверждать или выводить сообщение после завершения асинхронной операции "означает, что это обрабатывается самим spring-amqp или если это то, что я должен сделать?Т.е. мне нужно ack / nack сообщение после вызова myService.doSomething(myMessage) или Spring AMQP автоматически его подтверждает, так как я возвращаю Mono (даже если AcknowledgeMode.MANUAL установлено)?

Если мне нужно вручную отправить ack или reject, каков идиоматичный способ сделать это неблокирующим способом при использовании RabbitListener?

1 Ответ

2 голосов
/ 21 мая 2019

Адаптер слушателя заботится о подтверждении, когда моно завершено.

См. AbstractAdaptableMessageListener.asyncSuccess() и asyncFailure().

РЕДАКТИРОВАТЬ

Я не реактор, но, насколько я могу судить, заполнение Mono<Void> ничего не делает, поэтому методы on...() никогда не вызываются.

Вы можете подтвердить доставку вручную, используя channel.basicAck илиbasicReject ...

@RabbitListener(queues = "foo")
public void listen(String in, Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    ...
}
...