как сделать общий обмен мертвыми буквами во всех моих очередях - PullRequest
0 голосов
/ 30 июня 2018

У меня есть общая библиотека, которая обрабатывает все настройки для rabbitMQ, и я стремлюсь выработать общую стратегию обработки сообщений об ошибках после определенных попыток, и, что у меня на уме, есть обмен, который получает все сообщения об ошибках и будет различать каждое сообщение, имея заголовок с именем возникшей очереди. все это после ряда попыток и попыток.

Я понятия не имею, как реализовать этот подход, ваша помощь в достижении этого!

вот один из моих слушателей

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${swift.queue.driverOnlineStatus}", durable = "true"),
        exchange = @Exchange(value = "${swift.queue.driverExchange}",
            type = ExchangeTypes.HEADERS,
            ignoreDeclarationExceptions = "true"),
        arguments = {
            @Argument(name = "x-match", value = "all"),
            @Argument(name = "eventName", value = "${swift.queue.driverOnlineStatus}"),
        },
        key = ""))

и вот конфигурация

@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(value = {ConnectionFactory.class})
public SimpleMessageListenerContainer simpleMessageListenerContainerForAsyncRabbit(ConnectionFactory connectionFactoryfactory,
                                                                           @Autowired(required = false) ErrorHandler errorHandler,
                                                                           RabbitTemplate rabbitTemplate) {
    log.debug("creating new SimpleMessageListenerContainer ... for async rabbit: {}", rabbitProperties);
    SimpleMessageListenerContainer containerF = new SimpleMessageListenerContainer(connectionFactoryfactory);
    containerF.setConnectionFactory(connectionFactoryfactory);
    if (errorHandler != null) {
        containerF.setErrorHandler(errorHandler);
    }
    //listener retry
    Interceptor retryListenerInterceptor = RetryInterceptorBuilder.stateless()
            .maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
            .backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
            .recoverer(new RejectAndDontRequeueRecoverer())
            .build();

    /*
        Note on the two below interceptors
       use a non-transactional template for the DLQ
     When retries reach the maximum number that massage is dead-lettered
     */

    //listener try to republish to error exchange
    /*Interceptor retryListenerErrorInterceptor = RetryInterceptorBuilder.stateless()
            .maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
            .backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
            .recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
            .build();
    */

    //publisher retry
    Interceptor retryPublisherInterceptor = RetryInterceptorBuilder.stateless()
            .maxAttempts(rabbitProperties.getTemplate().getRetry().getMaxAttempts())
            .backOffOptions(rabbitProperties.getTemplate().getRetry().getInitialInterval(),rabbitProperties.getTemplate().getRetry().getMultiplier(),rabbitProperties.getTemplate().getRetry().getMaxInterval())
            .recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
            .build();

    containerF.setAdviceChain(retryListenerInterceptor,retryPublisherInterceptor);
    return containerF;
}

Я не уверен, нужно ли нам определять SimpleMessageListenerContainer Я думаю, что мы должны полагаться на @EnableRabbit

насколько я понимаю ваш подход, мы должны создать bean-компонент для retryListenerInterceptor с RepublishMessageRecoverer, который будет выполнять работу

Ответы [ 2 ]

0 голосов
/ 02 июля 2018

Я обнаружил, что подпружиненный башмак имеет встроенную поддержку повтора для простого контейнера, добавив следующие свойства:

spring.rabbitmq.listener.simple.retry.enabled=true

это включит повторную попытку, если вы не установили другие свойства, связанные с повторной попыткой, по умолчанию. Кроме того, вы должны убедиться, что вы отключили функцию очереди

spring.rabbitmq.listener.simple.default-requeue-rejected=false

Теперь вы должны определить RepublishMessagRecoverer, добавив следующий код

@BeanmatchIfMissing = true)
public RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange");
}

@Bean
public RetryOperationsInterceptor rabbitRetryInterceptor(RabbitTemplate rabbitTemplate) {
    log.debug("creating retry operation for rabbitmq ");
    return RetryInterceptorBuilder.stateless()
            .recoverer(republishMessageRecoverer(rabbitTemplate))
            .build();
}

теперь все контейнеры по умолчанию будут иметь эту конфигурацию, и, если повторная попытка исчерпана, они будут отправлены на обмен, определенный в retryInterceptor с этими заголовками

public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";  
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

Дополнительное примечание: если вы хотите иметь другое поведение для других очередей, вы можете определить другую listenerContainer и назначить ее слушателю

** Пример: **

@Bean
RabbitListenerContainerFactory dispatchConnection(ConnectionFactory connectionFactory){
    System.out.println("creating the container");
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setDefaultRequeueRejected(true);

    return factory;
}

и присвойте контейнеру @RabbitListener, как показано в примере ниже

@RabbitListener(containerFactory ="beanListenerContainerName",  queues = "queueName")

Я также хотел бы поблагодарить Гари Рассела за его вдохновение своим ответом.

0 голосов
/ 30 июня 2018

RepublishMessageRecoverer делает именно то, что вы хотите. Когда попытки доставки исчерпаны, он отправляет сообщение DLX со следующими дополнительными заголовками:

public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

Вы можете либо направить все сообщения в одну очередь, либо связать DLQ с исходным ключом (ключами) маршрутизации.

См. документацию .

RepublishMessageRecoverer публикует сообщение с дополнительной информацией в заголовках сообщений, таких как сообщение об исключении, трассировка стека, исходный ключ обмена и ключ маршрутизации. Дополнительные заголовки могут быть добавлены путем создания подкласса и переопределения additionalHeaders(). DeliveryMode (или любые другие свойства) также можно изменить в методе additionalHeaders():

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...