У меня есть архитектура, которая выглядит следующим образом:
- Как только сообщение отправляется в очередь SQS, задача ECS выбирает это сообщение и обрабатывает его.
- Какойозначает, что если в очередь отправлено X сообщений, задача X ECS будет запущена параллельно.Задача ECS может извлечь только одно сообщение (согласно моему коду выше)
Задача ECS использует контейнер dockerized
Python и использует boto3
SQS client дляполучить и проанализировать сообщение SQS:
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
while sqs_message is not None:
# Process it
# Delete if from the queue
# Get next message in queue
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
def get_sqs_task_data(queue_url):
client = boto3.client('sqs')
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
return response
def parse_sqs_message(response_sqs_message):
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
# ... parse it and return a dict
return {
data_1 = ...,
data_2 = ...
}
В общем, довольно просто.
В get_sqs_data()
я явно указываю, что хочу получить только одно сообщение (поскольку одна задача ECS должна обрабатывать только одно сообщение).В parse_sqs_message()
я проверяю, остались ли в очереди некоторые сообщения с
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
Когда в очереди только одно сообщение (то есть запущена одна задача ECS), все работает нормально.Задача ECS может выбрать сообщение, обработать его и удалить.
Однако, когда очередь заполняется сообщениями X (X > 1
) одновременно , запускается задача X ECS, но только задача ECS может извлечь одно из сообщений.и обработать это.Все остальные задачи ECS будут завершены с No messages found in queue
, хотя осталось обработать X - 1
сообщений.
Почему это?Почему другие задачи не могут выбрать оставленные сообщения?
Если это имеет значение, VisibilityTimeout
SQS устанавливается на 30 минут.
Любая помощь будет принята с благодарностью!
Не стесняйтесь просить о большей точности, если хотите.