У меня есть приложение Spring Boot, которое обрабатывает сообщения из очереди AWS SQS.Приложение отправляет запрос на 10 сообщений, инициирует от CountDownLatch
до 10, затем передает каждое сообщение методу @Async
, который возвращает CompletableFuture
.CompletableFuture
имеет thenAccept()
, который удаляет сообщение, и whenComplete()
, который регистрирует исключение, если оно есть, и уменьшает обратный отсчет защелки.Когда обратный отсчет завершается, извлекается следующая партия сообщений, и процесс начинается заново.Когда нет исключений, все это работает отлично.Однако, если исключение выдается в методе CompletableFuture
, метод выполняется дважды перед возвратом к методу whenComplete()
.
Main:
public void readAllMessages(String sqsUrl, MessageConsumer messageConsumer) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsUrl).withMaxNumberOfMessages(10);
List<Message> messages;
try {
do {
messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages();
if (CollectionUtils.isNotEmpty(messages)) {
final CountDownLatch latch = new CountDownLatch(messages.size());
messages.forEach(message -> {
try {
messageConsumer.processMessage(message)
.thenAccept(m -> {
deleteMessage(sqsUrl, message);
})
.whenComplete((value, exception) -> {
LOGGER.info("Processing complete for message {}", message.getMessageId());
latch.countDown();
if (exception != null) {
exceptionLogger.logException(String.format("Couldn't process message. queue:%s. id:%s", sqsUrl, message.getMessageId()), exception);
}
});
} catch (Throwable e) {
LOGGER.error("Refreshing tax rate for message {} failed with exception {} ", message.getBody(), e);
}
});
latch.await();
} else {
LOGGER.debug("Queue is empty: '{}'", sqsUrl);
}
} while (CollectionUtils.isNotEmpty(messages));
} catch (InterruptedException e) {
LOGGER.info("Thread interrupted, stopping.");
}
}
MessageConsumer.processMessage
@XRayLogged(segmentName = "outdated_host_tax_rate_update")
@Async
@Transactional
@Override
public CompletableFuture<?> processMessage(Message message) {
OutdatedTaxRateMessage taxRateMessage;
try {
taxRateMessage = objectMapper.readValue(message.getBody(), OutdatedTaxRateMessage.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
Long id = taxRateMessage.getHostId();
LOGGER.info("Updating tax rate. hostId:'{}', messageId:'{}'", id, message.getMessageId());
hostTaxRateRefreshingService.refreshHostTaxRate(id);
LOGGER.info("Updated tax rate. hostId:'{}', messageId:'{}'", id, message.getMessageId());
return CompletableFuture.completedFuture(null);
}
При возникновении исключения дважды регистрируется сообщение «Обновление ставки налога. HostId: ''», за которым следует один набор сообщений из whenComplete()
block («Обработка завершена ...», «Не удалось обработать сообщение ...»)
Может кто-нибудь помочь мне понять, почему это происходит?