Правильно ли я отправляю и получаю сообщения, используя очередь AWS SQS FIFO? - PullRequest
1 голос
/ 07 августа 2020

Я нахожусь в процессе исследования AWS очередей SQS FIFO и создания некоторого прототипа. Однако мне трудно понять, как я могу извлечь конкретное сообщение c, которое было отправлено. Теперь пара вопросов:

  1. Я так понимаю, что при вызове ReceiveMessageAsync возвращается список сообщений. Должен ли я пройти oop через этот список и сопоставить свойство MessageId со свойством моего исходного отправленного сообщения?
  2. Если в очереди есть список необработанных сообщений, допустим, 15, и я отправляю новое сообщение, будет ли мое сообщение возвращено только с ReceiveMessageAsync, когда по крайней мере 6 сообщений будут удалены из очереди?

В настоящее время в моем прототипе я выполняю запрос SendMessageAsync и сразу после этого выполняю a ReceiveMessageAsync, чтобы получить обработанное сообщение. Именно здесь я прохожу oop через полученный список идентификаторов сообщений, чтобы получить свое сообщение, выполняю лог c над сообщением, а затем запрашиваю удаление сообщения из очереди. Это logi c правильный?

var sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
            var sendQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSSend"]}";
            var deduplicationId = "fc1e026d-4a04-4cdf-b0b0-16bc78dde19c"; //Guid.NewGuid().ToString();

        var sqsMessageRequest = new SendMessageRequest
        {
            QueueUrl = sendQueueUrl,
            MessageGroupId = "testGroup",
            MessageDeduplicationId = deduplicationId,
            MessageBody = "{\"message\":\"hello\"}"
        };
        try
        {
            var sendMessageResponse = await sqsClient.SendMessageAsync(sqsMessageRequest);

            var receiveQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}";
            var receiveMessageRequest = new ReceiveMessageRequest
            {
                AttributeNames = { "All" },
                MaxNumberOfMessages = 10,
                MessageAttributeNames = { "All" },
                QueueUrl = receiveQueueUrl,
                WaitTimeSeconds = 20
            };

            bool messagesFound = false;

            while (!messagesFound)
            {
                var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);

                if (receiveMessageResponse.HttpStatusCode != System.Net.HttpStatusCode.OK)
                    Console.WriteLine("Failed request to receive message\n");
                else
                {
                    foreach (var message in receiveMessageResponse.Messages)
                    {
                        if (message.MessageId != sendMessageResponse.MessageId)
                            continue;

                        messagesFound = true;
                        /*process message further and delete afterwards*/
                        var deleteMessageRequest = new DeleteMessageRequest($"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}", message.ReceiptHandle);
                        var deleteMessageResponse = await sqsClient.DeleteMessageAsync(deleteMessageRequest);
                    }
                }
            }
        }
        catch (Exception ex)
        {
            throw new Exception("SendMessageAsync: " + ex.Message);
        }
        finally
        {
            sqsClient.Dispose();
        }

1 Ответ

0 голосов
/ 07 августа 2020

Цикл по очереди для поиска определенного c элемента будет неэффективным для SQS в очереди FIFO, если это ваша цель.

На самом деле вам будет лучше смотреть на Virtual Queues который предложит способ использования SQS для выполнения сопоставления отправки и получения 1: 1.

Используя эту функцию, вы определяете виртуальную очередь для вашего сообщения c, потребитель будет возможность обрабатывать все сообщения (или технически потребителя из указанной c виртуальной очереди).

Принимая во внимание, что как очереди FIFO, так и стандартные очереди SQS, вам нужно будет обрабатывать все сообщения в очереди, которая ограничена 10 сообщениями в время. Это делает неэффективными попытки найти ваше конкретное сообщение c.

...