AWS Lambda с Python записать в CloudWatch throw DataAlreadyAcceptedException - PullRequest
0 голосов
/ 27 мая 2020

Привет, я создаю лямбда-функцию AWS с API Gateway для записи журнала из ngx-logger в моем проекте Angular в CloudWatch. Вот лямбда-функция python

def log(event, context):
    log_streams = logs.describe_log_streams(logGroupName=LOG_GROUP)
    nextSequenceToken=log_streams['logStreams'][0]['uploadSequenceToken']

    timestamp = int(round(time.time() * 1000))

    result = logs.put_log_events(
        logGroupName=LOG_GROUP,
        logStreamName=LOG_STREAM,
        logEvents=[
            {
                'timestamp': timestamp,
                'message': Hello world, here is our first log message!'
            }
        ],
        sequenceToken=nextSequenceToken)

    return {
    'statusCode': 200,
    'body': json.dumps('Hello from Lambda!')
}

Однако, когда я запускаю свой интерфейс с несколькими ошибками, я вижу, что некоторые ошибки регистрируются, а некоторые ошибки нет, за следующим исключением

DataAlreadyAcceptedException: An error occurred (DataAlreadyAcceptedException) when calling the PutLogEvents operation: The given batch of log events has already been accepted. The next batch can be sent with sequenceToken: xxxxxxxxxxxxxxxxxxx

Я - вещь, если это потому, что Lambda является asyn c и два запроса получают один и тот же следующий порядковый номер. Как я могу избежать этой проблемы? Спасибо

1 Ответ

0 голосов
/ 30 мая 2020

Насколько я понимаю, если поток журнала создается заново, мне не нужно устанавливать это значение. Но если я повторно использую существующий поток, это поле обязательно. Пожалуйста, поправьте меня, если я ошибаюсь.

Вы абсолютно правы, и я прошу прощения за то, что не прочитал более внимательно spe c.

Вот быстрый пример, который я бросил вместе, чтобы продемонстрировать, как я могу решить эту проблему. Это, конечно, не является надежным в зависимости от объема, с которым вы ожидаете работать. (Простите, что решение находится в JavaScript, а не в Python; это то, с чем я более знаком.)

const AWS = require('aws-sdk');
const cloudwatchlogs = new AWS.CloudWatchLogs();

exports.handler = async(event) => {
    const LOG_GROUP_NAME = 'test-logs';
    const LOG_STREAM_NAME = 'test-stream';

    let nextSequenceToken = null;

    // Naive initial usage. Since we're including an empty nextSequenceToken
    // we'll call describeLogStreams first to retrieve the token
    nextSequenceToken = await log(
        LOG_GROUP_NAME,
        LOG_STREAM_NAME,
        'log 1', 
        nextSequenceToken,
    );

    // This is the happy path. It will still fail on the first log attempt if
    // another process has written to the stream. Note that we intentionally
    // don't update nextSequenceToken
    await log(
        LOG_GROUP_NAME,
        LOG_STREAM_NAME,
        'log 2', 
        nextSequenceToken,
    );

    // This will definitely fail on the first attempt since we didn't update
    // nextSequenceToken above -- but the first retry will succeed
    await log(
        LOG_GROUP_NAME,
        LOG_STREAM_NAME,
        'log 3', 
        nextSequenceToken,
    );

    return {};
};

const log = async(
    logGroupName,
    logStreamName,
    message,
    sequenceToken = null,
    retries = 3,
) => {
    if (sequenceToken == null) {
        sequenceToken = await retrieveNextSequenceToken(
            logGroupName,
            logStreamName,
        );
    }

    var params = {
        logEvents: [{
            message,
            timestamp: Date.now(),
        }],
        logGroupName,
        logStreamName,
        sequenceToken,
    };

    return await new Promise((resolve, reject) => {
        cloudwatchlogs.putLogEvents(params, function(err, data) {
            if (err) {
                if (
                    retries > 0 &&
                    err.code === 'InvalidSequenceTokenException'
                ) {
                    console.log('invalid sequence token, retrying');
                    const nextSequenceToken =
                        /sequenceToken is: (.*)/.exec(err.message)[1];
                    console.log('nextSequenceToken: ' + nextSequenceToken);
                    log(
                        logGroupName,
                        logStreamName,
                        message,
                        nextSequenceToken,
                        retries - 1,
                    ).then(resolve).catch(reject);
                }
                else {
                    reject(err);
                }
            }
            else {
                resolve(data.nextSequenceToken);
            }
        });
    });
};

const retrieveNextSequenceToken = async(logGroupName, logStreamName) => {
    const params = {
        logGroupName,
        descending: true,
        limit: 1,
        logStreamNamePrefix: logStreamName,
        orderBy: 'LogStreamName',
    };

    return await new Promise((resolve, reject) => {
        cloudwatchlogs.describeLogStreams(params, function(err, data) {
            if (err) {
                reject(err);
            }
            else {
                resolve(data.logStreams[0].uploadSequenceToken);
            }
        });
    });
};

Журналы выполнения:

START RequestId: 64c93ac7-d7a3-415d-a65f-88f9a01f5e31 Version: $LATEST
2020-05-30T00:26:47.987Z    64c93ac7-d7a3-415d-a65f-88f9a01f5e31    INFO    invalid sequence token, retrying
2020-05-30T00:26:48.025Z    64c93ac7-d7a3-415d-a65f-88f9a01f5e31    INFO    nextSequenceToken: 49605487572640549775784175775008805339581397298215084354
END RequestId: 64c93ac7-d7a3-415d-a65f-88f9a01f5e31

Это своего рода hack - и даже AWS не пытается этого сделать при работе с потенциально параллельными потоками данных (посмотрите, как выполнение Lambda создает новые потоки для каждого основного бегуна). Если у вас нет веских причин для отправки журналов в тот же поток, я бы посоветовал следовать аналогичной схеме.

...