Сообщение весенней загрузки rabbitmq не попадает в очередь - PullRequest
0 голосов
/ 20 октября 2018

Привет. Я пытаюсь запросить определенные сообщения, если выбрасывается определенное исключение, но для любых сбоев проверки я хочу, чтобы они направлялись прямо в очередь недоставленных сообщений.У меня есть соответствующие очереди и очереди мертвых писем включены.Я нахожу, что мои ошибки проверки получены в dlq, но другие ошибки постоянно находятся в нерабочем состоянии и постоянно повторяются, помимо установленных макс-попыток и множителя, есть идеи, почему это так?код ниже я использую релиз Spring boot 2.0.4

@RabbitListener(queues = "${queuename}")
    public void consume(final @Valid @Payload MyRequest myRequest) {
        if (method.fail()) {
          throw new RuntimeException("");
        }
    }

 @Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(jackson2Converter());
    factory.setValidator(amqpValidator());
    return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
public Validator amqpValidator() {
    return new OptionalValidatorFactoryBean();
}

@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}


@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory listenerContainerFactory =
            new SimpleRabbitListenerContainerFactory();
    listenerContainerFactory.setConnectionFactory(connectionFactory());
    listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new MyErrorPayload()));
    listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
    return listenerContainerFactory;
}

 @Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitQueueHost);
    connectionFactory.setUsername(rabbitQueueUsername);
    connectionFactory.setPassword(rabbitQueuePassword);
    connectionFactory.setVirtualHost(rabbitQueueVirtualHost);
    return connectionFactory;
}



public class MyErrorPayload implements FatalExceptionStrategy {


@Override
public boolean isFatal(Throwable t) {
  if (t instanceof ListenerExecutionFailedException &&
        (t.getCause() instanceof MessageConversionException ||
         t.getCause() instanceof MethodArgumentNotValidException )
        ) {
      return true;
  }
    return false;
 }
}

application.yml (свойства)

spring:
  rabbitmq:
    host: localhost
    username: uu
    password: pp
    virtual-host: /
    listener:
      simple:
        default-requeue-rejected: false
        retry:
          enabled: true
          initial-interval: 2000
          multiplier: 1.5
          max-interval: 10000
          max-attempts: 3

1 Ответ

0 голосов
/ 20 октября 2018

Это потому, что вы не используете автоматическую настройку Boot для фабрики контейнеров.Таким образом, конфигурация повторов игнорируется.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory listenerContainerFactory =
            new SimpleRabbitListenerContainerFactory();
    listenerContainerFactory.setConnectionFactory(connectionFactory());
    listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new MyErrorPayload()));
    listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
    return listenerContainerFactory;
}

То же самое было верно для примера, на который @Barath ссылается в своем комментарии.

Вставьте конфигуратор в ваш фабричный метод и вызовите его;например, для этого образца ...

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
        SimpleRabbitListenerContainerFactoryConfigurer configurer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setErrorHandler(errorHandler());
    return factory;
}

Если есть только один Бин конвертера сообщений, конфигуратор добавит это тоже.

Я обновил образец.

РЕДАКТИРОВАТЬ

Пользовательская политика повторов для выборочных исключений;следующее отключает повторную попытку для ValidationException, но повторяет все остальные.(Опять же, для примера приложения) ...

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
        SimpleRabbitListenerContainerFactoryConfigurer configurer, RabbitProperties properties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setErrorHandler(errorHandler());
    ListenerRetry retryConfig = properties.getListener().getSimple().getRetry();
    if (retryConfig.isEnabled()) {
        RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                ? RetryInterceptorBuilder.stateless()
                : RetryInterceptorBuilder.stateful());
        RetryTemplate retryTemplate = new RetryTemplate();
        Map<Class<? extends Throwable>, Boolean> retryableExceptions = Collections
                .singletonMap(ValidationException.class, false);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryConfig.getMaxAttempts(),
                retryableExceptions, true, true); // retry all exceptions except Validation
        retryTemplate.setRetryPolicy(retryPolicy);
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(retryConfig.getInitialInterval().toMillis());
        backOffPolicy.setMaxInterval(retryConfig.getMaxInterval().toMillis());
        backOffPolicy.setMultiplier(retryConfig.getMultiplier());
        retryTemplate.setBackOffPolicy(backOffPolicy);
        builder.retryOperations(retryTemplate);
        builder.recoverer(new RejectAndDontRequeueRecoverer());
        factory.setAdviceChain(builder.build());
    }
    return factory;
}

Никаких сообщений никогда не ставится в очередь, поскольку у вас есть default-requeue-rejected: false.

...