Не получает все сообщения на лямбда получателя от SQS с AWS без сервера и NodeJS - PullRequest
0 голосов
/ 21 сентября 2019

Мы создали две функции lamdba с использованием AWS Serverless и NodeJS, одна из которых является отправителем, а другая - получателем, и между ними есть одна стандартная SQS, которая получает данные от лямбды-отправителя и автоматически запускает функцию-лямбда получателя, если поступает какое-либо сообщениев SQS.

Здесь мы можем успешно отправить все сообщения от лямбды-отправителя в SQS (согласно журналам), но в лямбда-получателе мы не получаем все сообщения.

Комупротестировав этот сценарий, я отправил 1000 сообщений от почтальона и проверил, что отправитель отлично отправлял все сообщения в SQS, но получатель получал только 986 ​​случайных сообщений, некоторые случайные сообщения были пропущены между ними.

Я также попробовал тип SQS для FIFO вместо стандартного, но он не поддерживается лямбда-выражением.

Вот код функции отправителя и получателя:

SENDER:

sender: () => {
    const sqs = new AWS.SQS({ apiVersion: "2019-08-09" });
    let body = [
        { id : "1" },
        { id : "2" },
        { id : "3" }
    ]
    let params = {
        DelaySeconds: 10,
        QueueUrl: url,
        MessageBody: JSON.stringify(body)
    };
    sqs.sendMessage(params, function (err, data) {
        if (err) {
            callback(true, null);
        } else {
            callback(false, data);
        }
    });
}

ПОЛУЧАТЕЛЬ:

receiver: () => {
    event.Records[0].body = JSON.parse(event.Records[0].body);
    async.timesSeries(event.Records[0].body.length, (i, next) => {
        const params = {
            TableName: "user",
            Key: {
                id: "1"
            }
        };
        dynamoDb.get(params).promise()
            .then(result => {
                //save user
            })
            .catch(error => {
                //throw err
                next();
            });
    }, () => {
        console.log("deleting message");
        const deleteParams = {
            QueueUrl: "queue_url",
            ReceiptHandle: event.Records[0].receiptHandle
        };
        sqs.deleteMessage(deleteParams, function (err, data) {
            if (err) {
                //throw error
            } else {
                //success
            }
        });
    });
}

1 Ответ

2 голосов
/ 21 сентября 2019

В этом коде:

event.Records[0].body = JSON.parse(event.Records[0].body);
async.timesSeries(event.Records[0].body.length, (i, next) => {

Явно взяв Records[0], вы обрабатываете только первую запись в списке.event.Records - это список, и вам нужно перебирать все записи в списке.См. Пример события SQS в документации здесь , которая включает в себя несколько записей.


В качестве альтернативы, в ваших настройках интеграции Lambda SQS измените Batch Size на 1, так что только одна запись 1 будет передана в вашу функцию Lambda.

...