Как получить партию SQS от производителя лямбда к потребителю лямбда? - PullRequest
1 голос
/ 25 апреля 2019

Производитель AWS лямбда протягивает партию, например, 3 сообщения с использованием sqs.sendMessageBatch () в стандартную очередь AWS SQS. Однако лямбда-потребитель AWS вызывается 3 раза для каждой записи исходного пакета SQS.

Как я могу получить партию SQS от партии производителя до партии потребителя в ее объекте?

sqs.sendMessageBatch ()

Весь пакет сообщений запускает лямбду. Событие лямбда-потребителя SQS получает весь входной пакет.

Я установил размер пакета для лямбды-потребителя по умолчанию равным 10. Производитель лямбда-сообщений отправляет пакет, содержащий 3 сообщения (записи). Я наблюдаю, что лямбда-гетер потребителя вызывается 3 раза для каждого сообщения в пакете:

// PRODUCER
async function sendMessageBatch(params) {
    return new Promise(function (res, rej) {
        sqs.sendMessageBatch(params, function (err, data) {
            if (err) {
                rej(err);
            } else {
                res(data);
            }
        });
    });
}


exports.handler = async (event) => {
    .
    .
    .
    .
    var transaction;
    var entries = [];
    var entry;
    var transactions = await scan(sparams);
    // e.g. 3 messages
    for (var i = 0; i < transactions.length; i++) {
        transaction = transactions[i];
        console.log("TX: " + JSON.stringify(transaction));
        var msgBody = JSON.stringify({
            asset: transaction.asset,
            lastUpdatedAt: transaction.lastUpdatedAt,
            to: transaction.to,
            qty: transaction.qty,

        });
        entry = {
            Id: uuidv1(),
            MessageBody: msgBody,
            DelaySeconds: 0
        };
        entries.push(entry);
    }

    if (entries.length) {
        console.log("Entries: " + JSON.stringify(entries));
        var sendMessageBatchParams = {
            Entries: entries,
            QueueUrl: QUEUE_URL
        };
        await sendMessageBatch(sendMessageBatchParams);
    }
    return {};
}

//CONSUMER
exports.handler = async (event) => {
    // expected: 3 messages in event.Records (batch)
    for (var record of event.Records) {
        .
        .
        .
    }
}

1 Ответ

2 голосов
/ 25 апреля 2019

В блоге без сервера есть действительно замечательная статья об интеграции Lambda и SQS с отдельным разделом, посвященным пакетной обработке.

Убедитесь, что вы правильно установили batchSize (максимальное количество сообщений для обработки). Если важна ровно одна доставка, убедитесь, что ваша очередь является очередью FIFO, в противном случае убедитесь, что вы отметили второй момент в планировании устойчивости, убедившись, что обработка идемпотентна.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...