Экспоненциальный откат для бизнес-исключений при использовании реактивного Spring-AMQP? - PullRequest
1 голос
/ 22 мая 2019

Я использую Spring AMQP 2.1.6 и Spring Boot 2.1.5 и ищу рекомендованный способ настройки spring-amqp для повторения бизнес-исключений для реактивных компонентов (Mono) с экспоненциальным откатом. Например:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

Я бы хотел, чтобы spring-amqp повторил попытку автоматически, если doSomething вернет ошибку. Обычно это можно настроить для блокировки RabbitListener при настройке фабрики контейнеров:

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));

Где retryInterceptor может быть определено так:

private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(backoffInitialInterval);
    backOffPolicy.setMultiplier(backoffMultiplier);
    backOffPolicy.setMaxInterval(backoffMaxInterval);

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
    retryTemplate.setBackOffPolicy(backOffPolicy);

    StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
    bean.setRetryOperations(retryTemplate);
    return bean.getObject();
}

Но цепочка рекомендаций, похоже, не используется для реактивного RabbitListener. Вероятно, это потому, что, если я правильно понимаю, RetryTemplate / ExponentialBackOffPolicy фактически блокирует поток.

В качестве обходного пути я, конечно, мог бы сделать что-то вроде (переключиться на Kotlin, потому что это немного проще):

@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
    return myService.doSomething(myMessage)
                    .retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
            log.info("Caught exception ${ctx.exception()}")
        }
}

Но я бы хотел, чтобы эта логика повторения применялась ко всем экземплярам Mono, возвращаемым из RabbitListener. Возможно ли что-то подобное, или вы должны настроить это по-другому при использовании реактивных последовательностей из проектного реактора с spring-amqp?

1 Ответ

1 голос
/ 22 мая 2019

Действительно лучше применить логику повторения к вашей реактивной последовательности, аналогично тому, как вы это делаете с retryExponentialBackoff().Просто потому, что выполнение Reactive Streams не происходит в одном потоке, мы можем применить это Retrytemplate для myListener().

. Прямо сейчас внутренняя логика выглядит так:

private static class MonoHandler {

    static boolean isMono(Object result) {
        return result instanceof Mono;
    }

    @SuppressWarnings("unchecked")
    static void subscribe(Object returnValue, Consumer<? super Object> success,
            Consumer<? super Throwable> failure) {

        ((Mono<? super Object>) returnValue).subscribe(success, failure);
    }

}

Что Consumer<? super Throwable> failure делает это:

private void asyncFailure(Message request, Channel channel, Throwable t) {
    this.logger.error("Future or Mono was completed with an exception for " + request, t);
    try {
        channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
    }
    catch (IOException e) {
        this.logger.error("Failed to nack message", e);
    }
} 

Итак, у нас нет никакого способа инициировать этот RetryTemplate, но в то же время с явным basicNack() мы имееместественная повторная попытка с повторной загрузкой того же сообщения от RabbitMQ назад.

Возможно, мы могли бы применить повторную попытку Reactor для этого Mono внутренне, но это не похоже на то, что RetryOperationsInterceptor можно просто преобразовать вMono.retry().

Итак, другими словами, RetryOperationsInterceptor является неправильным способом для реактивной обработки.Явно используйте Mono.retry() в своем собственном коде.

Вы можете использовать какой-либо общий метод утилит и применять его как Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer) всякий раз, когда у вас есть реактивный возврат для метода @RabbitListener.

...