У меня настроена инфраструктура AWS, так что каждое обновление записи динамо-базы данных заканчивается в очереди SQS FIFO с включенной дедупликацией. У меня также есть тест, охватывающий этот сценарий, в котором я очищаю очередь (очередь может получать обновления от других тестов в костюме. Чтобы избежать необходимости опрашивать большое количество сообщений перед получением правильных сообщений, я очищаю очередь перед запуском теста) и обновите Dynamo Db и убедитесь, что эти записи получены при опросе очереди. Этот тест нестабилен, а иногда он не проходит, потому что все отправленные мной обновления не получены из очереди.
В очереди есть только один потребитель, который является тестом, который я написал. Таким образом, это не похоже на то, что есть другой потребитель, который потребляет эти сообщения.
Я проверил очередь через консоль AWS, и в конце теста она пуста и не содержит пропущенных сообщений по истечении времени тестаиз-за установленного значения TIMEOUT.
Моя конфигурация очереди в CDK
public Queue createSqsQueue() {
return new Queue(this, "DynamoDbUpdateSqsQueue", QueueProps.builder()
.withContentBasedDeduplication(true)
.withFifo(true)
.withQueueName("DynamoDbUpdateSqsQueue.fifo")
.withReceiveMessageWaitTime(Duration.seconds(20))
.build());
}
Мой код сообщения получения
private void assertExpectedDynamoDbUpdatesAreReceived() {
List<String> expectedDynamoDbUpdates = getExpectedDynamoDbUpdates();
List<String> actualDynamoDBUpdates = newArrayList();
boolean allDynamoDbUpdatesReceived = false;
stopWatch.start();
while (!allDynamoDbUpdatesReceived && stopWatch.getTime() < TIMEOUT ) {
List<String> receivedDynamoDbUpdates =
AmazonSQSClientBuilder.standard().receiveMessage(queueUrl).getMessages().stream()
.map(this::processAndDelete)
.collect(Collectors.toList());
actualDynamoDBUpdates.addAll(receivedDynamoDbUpdates);
if(actualDynamoDBUpdates.containsAll(expectedDynamoDbUpdates)){
allDynamoDbUpdatesReceived= true;
}
}
stopWatch.stop();
assert(allDynamoDbUpdatesReceived).isTrue();
}