Я нахожусь в процессе исследования AWS очередей SQS FIFO и создания некоторого прототипа. Однако мне трудно понять, как я могу извлечь конкретное сообщение c, которое было отправлено. Теперь пара вопросов:
- Я так понимаю, что при вызове
ReceiveMessageAsync
возвращается список сообщений. Должен ли я пройти oop через этот список и сопоставить свойство MessageId
со свойством моего исходного отправленного сообщения? - Если в очереди есть список необработанных сообщений, допустим, 15, и я отправляю новое сообщение, будет ли мое сообщение возвращено только с
ReceiveMessageAsync
, когда по крайней мере 6 сообщений будут удалены из очереди?
В настоящее время в моем прототипе я выполняю запрос SendMessageAsync
и сразу после этого выполняю a ReceiveMessageAsync
, чтобы получить обработанное сообщение. Именно здесь я прохожу oop через полученный список идентификаторов сообщений, чтобы получить свое сообщение, выполняю лог c над сообщением, а затем запрашиваю удаление сообщения из очереди. Это logi c правильный?
var sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
var sendQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSSend"]}";
var deduplicationId = "fc1e026d-4a04-4cdf-b0b0-16bc78dde19c"; //Guid.NewGuid().ToString();
var sqsMessageRequest = new SendMessageRequest
{
QueueUrl = sendQueueUrl,
MessageGroupId = "testGroup",
MessageDeduplicationId = deduplicationId,
MessageBody = "{\"message\":\"hello\"}"
};
try
{
var sendMessageResponse = await sqsClient.SendMessageAsync(sqsMessageRequest);
var receiveQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}";
var receiveMessageRequest = new ReceiveMessageRequest
{
AttributeNames = { "All" },
MaxNumberOfMessages = 10,
MessageAttributeNames = { "All" },
QueueUrl = receiveQueueUrl,
WaitTimeSeconds = 20
};
bool messagesFound = false;
while (!messagesFound)
{
var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);
if (receiveMessageResponse.HttpStatusCode != System.Net.HttpStatusCode.OK)
Console.WriteLine("Failed request to receive message\n");
else
{
foreach (var message in receiveMessageResponse.Messages)
{
if (message.MessageId != sendMessageResponse.MessageId)
continue;
messagesFound = true;
/*process message further and delete afterwards*/
var deleteMessageRequest = new DeleteMessageRequest($"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}", message.ReceiptHandle);
var deleteMessageResponse = await sqsClient.DeleteMessageAsync(deleteMessageRequest);
}
}
}
}
catch (Exception ex)
{
throw new Exception("SendMessageAsync: " + ex.Message);
}
finally
{
sqsClient.Dispose();
}