Насколько я понимаю, если поток журнала создается заново, мне не нужно устанавливать это значение. Но если я повторно использую существующий поток, это поле обязательно. Пожалуйста, поправьте меня, если я ошибаюсь.
Вы абсолютно правы, и я прошу прощения за то, что не прочитал более внимательно spe c.
Вот быстрый пример, который я бросил вместе, чтобы продемонстрировать, как я могу решить эту проблему. Это, конечно, не является надежным в зависимости от объема, с которым вы ожидаете работать. (Простите, что решение находится в JavaScript, а не в Python; это то, с чем я более знаком.)
const AWS = require('aws-sdk');
const cloudwatchlogs = new AWS.CloudWatchLogs();
exports.handler = async(event) => {
const LOG_GROUP_NAME = 'test-logs';
const LOG_STREAM_NAME = 'test-stream';
let nextSequenceToken = null;
// Naive initial usage. Since we're including an empty nextSequenceToken
// we'll call describeLogStreams first to retrieve the token
nextSequenceToken = await log(
LOG_GROUP_NAME,
LOG_STREAM_NAME,
'log 1',
nextSequenceToken,
);
// This is the happy path. It will still fail on the first log attempt if
// another process has written to the stream. Note that we intentionally
// don't update nextSequenceToken
await log(
LOG_GROUP_NAME,
LOG_STREAM_NAME,
'log 2',
nextSequenceToken,
);
// This will definitely fail on the first attempt since we didn't update
// nextSequenceToken above -- but the first retry will succeed
await log(
LOG_GROUP_NAME,
LOG_STREAM_NAME,
'log 3',
nextSequenceToken,
);
return {};
};
const log = async(
logGroupName,
logStreamName,
message,
sequenceToken = null,
retries = 3,
) => {
if (sequenceToken == null) {
sequenceToken = await retrieveNextSequenceToken(
logGroupName,
logStreamName,
);
}
var params = {
logEvents: [{
message,
timestamp: Date.now(),
}],
logGroupName,
logStreamName,
sequenceToken,
};
return await new Promise((resolve, reject) => {
cloudwatchlogs.putLogEvents(params, function(err, data) {
if (err) {
if (
retries > 0 &&
err.code === 'InvalidSequenceTokenException'
) {
console.log('invalid sequence token, retrying');
const nextSequenceToken =
/sequenceToken is: (.*)/.exec(err.message)[1];
console.log('nextSequenceToken: ' + nextSequenceToken);
log(
logGroupName,
logStreamName,
message,
nextSequenceToken,
retries - 1,
).then(resolve).catch(reject);
}
else {
reject(err);
}
}
else {
resolve(data.nextSequenceToken);
}
});
});
};
const retrieveNextSequenceToken = async(logGroupName, logStreamName) => {
const params = {
logGroupName,
descending: true,
limit: 1,
logStreamNamePrefix: logStreamName,
orderBy: 'LogStreamName',
};
return await new Promise((resolve, reject) => {
cloudwatchlogs.describeLogStreams(params, function(err, data) {
if (err) {
reject(err);
}
else {
resolve(data.logStreams[0].uploadSequenceToken);
}
});
});
};
Журналы выполнения:
START RequestId: 64c93ac7-d7a3-415d-a65f-88f9a01f5e31 Version: $LATEST
2020-05-30T00:26:47.987Z 64c93ac7-d7a3-415d-a65f-88f9a01f5e31 INFO invalid sequence token, retrying
2020-05-30T00:26:48.025Z 64c93ac7-d7a3-415d-a65f-88f9a01f5e31 INFO nextSequenceToken: 49605487572640549775784175775008805339581397298215084354
END RequestId: 64c93ac7-d7a3-415d-a65f-88f9a01f5e31
Это своего рода hack - и даже AWS не пытается этого сделать при работе с потенциально параллельными потоками данных (посмотрите, как выполнение Lambda создает новые потоки для каждого основного бегуна). Если у вас нет веских причин для отправки журналов в тот же поток, я бы посоветовал следовать аналогичной схеме.