Как вернуть сообщение обратно в SQS из лямбда-триггера - PullRequest
0 голосов
/ 17 декабря 2018

У меня есть лямбда-триггер, который читает сообщения из очереди SQS.В некоторых случаях сообщение может быть не готово к обработке, поэтому я хотел бы поместить сообщение в очередь на 1 минуту и ​​повторить попытку.В настоящее время я создаю еще одну копию этой записи клиента и помещаю эту новую копию в очередь.Есть ли причина / способ сохранить исходную запись в очереди, а не создавать новую

def postToQueue(customer):

    if 'attemptCount' in customer.keys():
        attemptCount = int(customer["attemptCount"]) + 1
    else:
        attemptCount = 2
    customer["attemptCount"] = attemptCount

    # Get the service resource
    sqs = boto3.resource('sqs')

    # Get the queue
    queue = sqs.get_queue_by_name(QueueName='testCustomerQueue')
response = queue.send_message(MessageBody=json.dumps(customer), DelaySeconds=60)

    print('customer postback: ', customer)
    print ('response from writing ot the queue is: ', response)

#main function
for record in event['Records']:
    if 'body' in record.keys():
        customer = json.loads(record['body'])
        print("attempting to process customer", customer, " at: ", datetime.datetime.now())
        if (not ifReadyToProcess(customer)):
            postToQueue(customer)
        else:
            processCustomer(customer)

1 Ответ

0 голосов
/ 17 декабря 2018

Это не идеальная установка для запуска лямбда-функций SQS.

Мои тесты показывают, что сообщения, отправленные в SQS, будут немедленно запускать лямбда-функцию , даже если задана настройка задержки.Таким образом, помещение сообщения обратно в очередь SQS вызовет повторное срабатывание Lambda сразу после.

Чтобы избежать ситуации, когда Lambda постоянно проверяет, готово ли сообщение к обработке, я бы порекомендовал:

  • Использование Amazon CloudWatch Events для запуска функции Lambda по расписанию (например, каждые 2 минуты)
  • Функция Lambda должна извлекать сообщения из очереди и проверять, готовы ли они к обработке.
    • Если они готовы, обработайте их и удалите
    • Если они не готовы, а затем отправьте их обратно в очередь с задержкой 1019* установка и удаление исходного сообщения

Обратите внимание, что это отличается от того, чтобы SQS напрямую запускал Lambda.Вместо этого лямбда-функция должна вызывать ReceiveMessages(), чтобы получить само сообщение (я), что позволяет функции задержки добавлять некоторое время между проверками.

Другой вариант: вместо повторной вставки сообщения в очередьВы можете просто воспользоваться установкой Default Visibility Timeout , удалив сообщение , а не .Сообщение, которое прочитано из очереди, но не удалено, автоматически «появится» в очереди.Вы можете использовать это как период повторных попыток.Однако это означает, что вам придется самостоятельно обрабатывать мертвые письма (например, если сообщение не может быть обработано после n попыток).

...