Я пытаюсь сохранить каждое сообщение, поступающее в очередь RabbitMQ, в базу данных, только для регистрации.После этого сообщение необходимо обработать в обычном режиме.
Проблема заключается в том, что в этой очереди настроена политика повторов с параметром RetryOperationsInterceptor
, и каждый раз, когда при обработке сообщения возникает какая-либо ошибка, сообщение помещается в очередь иобработан снова.Логика для сохранения сообщения находится в Слушателе, который читает очередь, поэтому вместо одного сообщения, сохраненного в базе данных, у меня есть 3
(количество попыток, которые я настроил).
См. Мой RetryOperationsInterceptor
:
@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(2000, 2.0, 10000)
.build();
}
Фабрика контейнеров:
@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
RetryOperationsInterceptor defaultRetryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(getMessageConverter());
factory.setDefaultRequeueRejected(false);
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
return factory;
}
Прослушиватель очереди:
@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {
private final MessageRepository messageRepository;
private final MessageService messageService;
@RabbitListener(queues = MessageConfiguration.QUEUE,
containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
public void process(SomeMessage someMessage) {
messageRepository.save(someMessage); // transform to a entity and save on database
messageService.process(someMessage); // process message
}
}
Не знаю, является ли это важной информацией, нос этой очередью также связан DLQ.После повторных попыток сообщение отправляется в очередь DLQ.
Моя идея - найти в перехватчике Retry что-то, что могло бы вызвать службу с первой попытки, чтобы сохранить сообщение только один раз.
Я открыт для других идей, чтобы решить эту проблему, например, сохранить номер попытки с сообщением, просто чтобы показать, что это не повторяющееся сообщение, сохраняемое в базе данных, а то же сообщение в другомпопытка.