Метод, возвращающий CompletableFuture, выполняется дважды при возникновении исключения - PullRequest
0 голосов
/ 07 июня 2018

У меня есть приложение Spring Boot, которое обрабатывает сообщения из очереди AWS SQS.Приложение отправляет запрос на 10 сообщений, инициирует от CountDownLatch до 10, затем передает каждое сообщение методу @Async, который возвращает CompletableFuture.CompletableFuture имеет thenAccept(), который удаляет сообщение, и whenComplete(), который регистрирует исключение, если оно есть, и уменьшает обратный отсчет защелки.Когда обратный отсчет завершается, извлекается следующая партия сообщений, и процесс начинается заново.Когда нет исключений, все это работает отлично.Однако, если исключение выдается в методе CompletableFuture, метод выполняется дважды перед возвратом к методу whenComplete().

Main:

public void readAllMessages(String sqsUrl, MessageConsumer messageConsumer) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsUrl).withMaxNumberOfMessages(10);
    List<Message> messages;
    try {
        do {
            messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages();
            if (CollectionUtils.isNotEmpty(messages)) {
                final CountDownLatch latch = new CountDownLatch(messages.size());
                messages.forEach(message -> {
                    try {
                        messageConsumer.processMessage(message)
                            .thenAccept(m -> {
                                 deleteMessage(sqsUrl, message);
                             })
                             .whenComplete((value, exception) -> {
                                 LOGGER.info("Processing complete for message {}", message.getMessageId());
                                 latch.countDown();
                                 if (exception != null) {
                                     exceptionLogger.logException(String.format("Couldn't process message. queue:%s. id:%s", sqsUrl, message.getMessageId()), exception);
                                 }
                             });
                    } catch (Throwable e) {
                        LOGGER.error("Refreshing tax rate for message {} failed with exception {} ", message.getBody(), e);
                    }
                });
                latch.await();
            } else {
                LOGGER.debug("Queue is empty: '{}'", sqsUrl);
            }
        } while (CollectionUtils.isNotEmpty(messages));
    } catch (InterruptedException e) {
        LOGGER.info("Thread interrupted, stopping.");
    }
}

MessageConsumer.processMessage

@XRayLogged(segmentName = "outdated_host_tax_rate_update")
@Async
@Transactional
@Override
public CompletableFuture<?> processMessage(Message message) {
    OutdatedTaxRateMessage taxRateMessage;
    try {
        taxRateMessage = objectMapper.readValue(message.getBody(), OutdatedTaxRateMessage.class);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    Long id = taxRateMessage.getHostId();
    LOGGER.info("Updating tax rate. hostId:'{}', messageId:'{}'", id, message.getMessageId());
    hostTaxRateRefreshingService.refreshHostTaxRate(id);
    LOGGER.info("Updated tax rate. hostId:'{}', messageId:'{}'", id, message.getMessageId());
    return CompletableFuture.completedFuture(null);
}

При возникновении исключения дважды регистрируется сообщение «Обновление ставки налога. HostId: ''», за которым следует один набор сообщений из whenComplete() block («Обработка завершена ...», «Не удалось обработать сообщение ...»)

Может кто-нибудь помочь мне понять, почему это происходит?

1 Ответ

0 голосов
/ 12 июня 2018

Причиной проблемы была определена пользовательская аннотация к методу.Эта аннотация предназначалась для передачи информации в AWS Xray, но она непреднамеренно выполняла аннотированный метод дважды при возникновении исключения.Этот механизм все еще разрабатывается, но, по крайней мере, мы определили виновника.Вопрос был обновлен, так как аннотация была удалена.

...