Как сделать очередь задержки SQS из х часов - PullRequest
1 голос
/ 17 апреля 2020

Мне нужна очередь для обработки сообщений после x часов задержки. И мне нужен управляемый данными весь основанный на событиях подход без использования каких-либо планировщиков и т. для использования различными AWS лямбда-функциями.

одна из лямбда-функций должна обрабатывать сообщения с задержкой в ​​3 часа. Тем не менее, максимальная задержка доставки составляет 15 минут. Если я прочту сообщение в первый раз, оно будет автоматически удалено из SQS, так как я использую триггеры отображения источника событий для вызова лямбда-функции.

Итак, мне интересно, как можно избежать удаления сообщения и сделать его невидимым при первой обработке?

Буду признателен за любые мысли / помощь.

Ответы [ 2 ]

2 голосов
/ 17 апреля 2020

Amazon SQS не будет выполнять то, что вы запрашиваете. Кроме того, я не рекомендую делать какие-либо «трюки», чтобы заставить его задержаться.

Я бы порекомендовал вам взглянуть на AWS Шаг Функции . Он может управлять взаимодействием между AWS лямбда-функциями и может быть настроен на ожидание (спящий режим) в течение некоторого периода перед вызовом AWS лямбда-функции .

1 голос
/ 17 апреля 2020

Я проверял это. И это кажется выполнимым. Я использовал приведенный ниже код для тестов. Тем не менее, это не кажется "хорошей практикой" способ делать то, что вы хотите достичь. Я вижу две основные проблемы:

  1. Существует ограничение для массажа в полете в размере 120 000. Таким образом, вы теряете почти неограниченное масштабирование своей очереди SQS.

  2. Ваши метрики будут завалены ошибочными вызовами, и будет трудно отличить guish настоящие неудачные вызовы от тех, которые не были выполнены намеренно.

Таким образом, я бы посмотрел на другие решения.

import json
import os
import time

import boto3

sqs = boto3.client('sqs')

queue_url = os.environ['QUEUE_URL']

new_visibility_timeout = 120 # seconds

def lambda_handler(event, context):

    print(json.dumps(event))

    current_time = time.time()

    no_of_new_records = 0

    for record in event['Records']:

        msg_timestamp = float(record['attributes']['SentTimestamp'])/1000

        msg_age = current_time - msg_timestamp

        print(f"Message age: {msg_age} seconds")

        if msg_age > new_visibility_timeout:

            print("Message to be successfully processed and deleted from queue")

            response = sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=record['receiptHandle']
            )

            print(response)

        else:

            print("Set long visibility timeout")

            response = sqs.change_message_visibility(
                QueueUrl=queue_url,
                ReceiptHandle=record['receiptHandle'],
                VisibilityTimeout=new_visibility_timeout
            )

            print(response)

            no_of_new_records += 1

    if no_of_new_records > 0:
        raise Exception("Fail the lambda")

    return {'statusCode': 200}
...