Выполните действие при первой попытке повтора, чтобы сохранить сообщение в базе данных. - PullRequest
0 голосов
/ 11 мая 2018

Я пытаюсь сохранить каждое сообщение, поступающее в очередь RabbitMQ, в базу данных, только для регистрации.После этого сообщение необходимо обработать в обычном режиме.

Проблема заключается в том, что в этой очереди настроена политика повторов с параметром RetryOperationsInterceptor, и каждый раз, когда при обработке сообщения возникает какая-либо ошибка, сообщение помещается в очередь иобработан снова.Логика для сохранения сообщения находится в Слушателе, который читает очередь, поэтому вместо одного сообщения, сохраненного в базе данных, у меня есть 3 (количество попыток, которые я настроил).

См. Мой RetryOperationsInterceptor:

@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(3) 
            .backOffOptions(2000, 2.0, 10000)
            .build();
}

Фабрика контейнеров:

@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
                                                                        RetryOperationsInterceptor defaultRetryOperationsInterceptor) {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(getMessageConverter());
    factory.setDefaultRequeueRejected(false);

    Advice[] adviceChain = {defaultRetryOperationsInterceptor};
    factory.setAdviceChain(adviceChain);

    return factory;
}

Прослушиватель очереди:

@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {

    private final MessageRepository messageRepository;
    private final MessageService messageService;

    @RabbitListener(queues = MessageConfiguration.QUEUE,
        containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
    public void process(SomeMessage someMessage) {

        messageRepository.save(someMessage); // transform to a entity and save on database
        messageService.process(someMessage); // process message
    }
}

Не знаю, является ли это важной информацией, нос этой очередью также связан DLQ.После повторных попыток сообщение отправляется в очередь DLQ.

Моя идея - найти в перехватчике Retry что-то, что могло бы вызвать службу с первой попытки, чтобы сохранить сообщение только один раз.

Я открыт для других идей, чтобы решить эту проблему, например, сохранить номер попытки с сообщением, просто чтобы показать, что это не повторяющееся сообщение, сохраняемое в базе данных, а то же сообщение в другомпопытка.

Ответы [ 2 ]

0 голосов
/ 11 мая 2018

Прежде всего, я хотел бы поблагодарить Артема и Гарри , помогающих мне решить мою проблему.

Существуют, по крайней мере, четыре решения:

  1. Решение Advice (см. ответ Гарри )
  2. Счетчик от RetrySynchronizationManager
  3. Решение messageId
  4. Использовать состояние RetryOperationsInterceptor

Счетчик из опции RetrySynchronizationManager может быть легко использован в моем случае:

int attempts = RetrySynchronizationManager.getContext().getRetryCount();
boolean canSaveOnDatabase = attempts == 0;

Параметр messageId необходим отправителю для отправки идентификатора, уникально идентифицирующего это сообщение. Таким образом, я мог бы сохранить и использовать эту информацию в базе данных, чтобы узнать, сохранилось ли сообщение. Это может быть настроено с помощью MessagePostProcessor при вызове некоторого метода send из RabbitTemplate.

Последний вариант с указанием состояния RetryOperationsInterceptor Я мог бы просто поискать заголовок @Header(AmqpHeaders.REDELIVERED) boolean redelivered в RabbitListener. Эта опция не работает, если вы используете RetryOperationsInterceptor без сохранения состояния (в моем случае), и для этой опции также требуется messageId.

0 голосов
/ 11 мая 2018

Обратите внимание, как применить повторную попытку:

Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);

Так же, как вы можете написать свой MethodInterceptor для записи в БД.И когда вы определяете этот пользовательский совет в порядке от до , что defaultRetryOperationsInterceptor, сохранение в БД будет вызываться только один раз, просто потому, что такая операция произойдет вне повтора и даже раньше.

Подробнее о АОП см. В документации: https://docs.spring.io/spring/docs/5.0.6.RELEASE/spring-framework-reference/core.html#aop

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...