Действительно лучше применить логику повторения к вашей реактивной последовательности, аналогично тому, как вы это делаете с retryExponentialBackoff()
.Просто потому, что выполнение Reactive Streams не происходит в одном потоке, мы можем применить это Retrytemplate
для myListener()
.
. Прямо сейчас внутренняя логика выглядит так:
private static class MonoHandler {
static boolean isMono(Object result) {
return result instanceof Mono;
}
@SuppressWarnings("unchecked")
static void subscribe(Object returnValue, Consumer<? super Object> success,
Consumer<? super Throwable> failure) {
((Mono<? super Object>) returnValue).subscribe(success, failure);
}
}
Что Consumer<? super Throwable> failure
делает это:
private void asyncFailure(Message request, Channel channel, Throwable t) {
this.logger.error("Future or Mono was completed with an exception for " + request, t);
try {
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
}
catch (IOException e) {
this.logger.error("Failed to nack message", e);
}
}
Итак, у нас нет никакого способа инициировать этот RetryTemplate
, но в то же время с явным basicNack()
мы имееместественная повторная попытка с повторной загрузкой того же сообщения от RabbitMQ назад.
Возможно, мы могли бы применить повторную попытку Reactor для этого Mono
внутренне, но это не похоже на то, что RetryOperationsInterceptor
можно просто преобразовать вMono.retry()
.
Итак, другими словами, RetryOperationsInterceptor
является неправильным способом для реактивной обработки.Явно используйте Mono.retry()
в своем собственном коде.
Вы можете использовать какой-либо общий метод утилит и применять его как Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer)
всякий раз, когда у вас есть реактивный возврат для метода @RabbitListener
.