SQS получает недостающие сообщения из очереди - PullRequest
1 голос
/ 12 октября 2019

У меня настроена инфраструктура AWS, так что каждое обновление записи динамо-базы данных заканчивается в очереди SQS FIFO с включенной дедупликацией. У меня также есть тест, охватывающий этот сценарий, в котором я очищаю очередь (очередь может получать обновления от других тестов в костюме. Чтобы избежать необходимости опрашивать большое количество сообщений перед получением правильных сообщений, я очищаю очередь перед запуском теста) и обновите Dynamo Db и убедитесь, что эти записи получены при опросе очереди. Этот тест нестабилен, а иногда он не проходит, потому что все отправленные мной обновления не получены из очереди.

В очереди есть только один потребитель, который является тестом, который я написал. Таким образом, это не похоже на то, что есть другой потребитель, который потребляет эти сообщения.

Я проверил очередь через консоль AWS, и в конце теста она пуста и не содержит пропущенных сообщений по истечении времени тестаиз-за установленного значения TIMEOUT.

Моя конфигурация очереди в CDK

public Queue createSqsQueue() {
    return new Queue(this, "DynamoDbUpdateSqsQueue", QueueProps.builder()
            .withContentBasedDeduplication(true)
            .withFifo(true)
            .withQueueName("DynamoDbUpdateSqsQueue.fifo")
            .withReceiveMessageWaitTime(Duration.seconds(20))
            .build());
}

Мой код сообщения получения

private void assertExpectedDynamoDbUpdatesAreReceived() {
    List<String> expectedDynamoDbUpdates = getExpectedDynamoDbUpdates();
    List<String> actualDynamoDBUpdates = newArrayList();
    boolean allDynamoDbUpdatesReceived = false;
    stopWatch.start();
    while (!allDynamoDbUpdatesReceived && stopWatch.getTime() < TIMEOUT ) {
        List<String> receivedDynamoDbUpdates =
                AmazonSQSClientBuilder.standard().receiveMessage(queueUrl).getMessages().stream()
                        .map(this::processAndDelete)
                        .collect(Collectors.toList());
        actualDynamoDBUpdates.addAll(receivedDynamoDbUpdates);
        if(actualDynamoDBUpdates.containsAll(expectedDynamoDbUpdates)){
            allDynamoDbUpdatesReceived= true;
        }
    }
    stopWatch.stop();
    assert(allDynamoDbUpdatesReceived).isTrue();
}

1 Ответ

1 голос
/ 13 октября 2019

Проблема не в получении сообщений. Это было в очистке очереди. Согласно документации очереди на очистку (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-purge-queue.html)

Процесс удаления сообщения занимает до 60 секунд. Рекомендуется подождать 60 секунд независимо от размера очереди

Простодобавление ожидания 60 секунд перед отправкой обновлений устранило проблему.

...