У меня есть общая библиотека, которая обрабатывает все настройки для rabbitMQ, и я стремлюсь выработать общую стратегию обработки сообщений об ошибках после определенных попыток, и, что у меня на уме, есть обмен, который получает все сообщения об ошибках и будет различать каждое сообщение, имея заголовок с именем возникшей очереди. все это после ряда попыток и попыток.
Я понятия не имею, как реализовать этот подход, ваша помощь в достижении этого!
вот один из моих слушателей
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${swift.queue.driverOnlineStatus}", durable = "true"),
exchange = @Exchange(value = "${swift.queue.driverExchange}",
type = ExchangeTypes.HEADERS,
ignoreDeclarationExceptions = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "eventName", value = "${swift.queue.driverOnlineStatus}"),
},
key = ""))
и вот конфигурация
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(value = {ConnectionFactory.class})
public SimpleMessageListenerContainer simpleMessageListenerContainerForAsyncRabbit(ConnectionFactory connectionFactoryfactory,
@Autowired(required = false) ErrorHandler errorHandler,
RabbitTemplate rabbitTemplate) {
log.debug("creating new SimpleMessageListenerContainer ... for async rabbit: {}", rabbitProperties);
SimpleMessageListenerContainer containerF = new SimpleMessageListenerContainer(connectionFactoryfactory);
containerF.setConnectionFactory(connectionFactoryfactory);
if (errorHandler != null) {
containerF.setErrorHandler(errorHandler);
}
//listener retry
Interceptor retryListenerInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
/*
Note on the two below interceptors
use a non-transactional template for the DLQ
When retries reach the maximum number that massage is dead-lettered
*/
//listener try to republish to error exchange
/*Interceptor retryListenerErrorInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval(),rabbitProperties.getListener().getSimple().getRetry().getMultiplier(),rabbitProperties.getListener().getSimple().getRetry().getMaxInterval())
.recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
.build();
*/
//publisher retry
Interceptor retryPublisherInterceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(rabbitProperties.getTemplate().getRetry().getMaxAttempts())
.backOffOptions(rabbitProperties.getTemplate().getRetry().getInitialInterval(),rabbitProperties.getTemplate().getRetry().getMultiplier(),rabbitProperties.getTemplate().getRetry().getMaxInterval())
.recoverer(new RepublishMessageRecoverer(rabbitTemplate,innodevProp.getEvent().getErrorExchangeName()))
.build();
containerF.setAdviceChain(retryListenerInterceptor,retryPublisherInterceptor);
return containerF;
}
Я не уверен, нужно ли нам определять SimpleMessageListenerContainer
Я думаю, что мы должны полагаться на @EnableRabbit
насколько я понимаю ваш подход, мы должны создать bean-компонент для retryListenerInterceptor
с RepublishMessageRecoverer
, который будет выполнять работу