Каждая партия событий, отправляемых в журналы CloudWatch, должна содержать токен последовательности (из документа PutLogEvents API ):
{
"logEvents": [
{
"message": "string",
"timestamp": number
}
],
"logGroupName": "string",
"logStreamName": "string",
"sequenceToken": "string"
}
Ответ возвращает nextSequenceToken
(снова из APIдокументы):
{
"nextSequenceToken": "string",
"rejectedLogEventsInfo": {
"expiredLogEventEndIndex": number,
"tooNewLogEventStartIndex": number,
"tooOldLogEventEndIndex": number
}
}
Итак, краткий ответ на ваш вопрос: если у вас есть один продюсер, пишущий в поток, вы можете сохранить nextSequenceToken
и использовать его для заполнения sequenceToken
для следующего PutLogEvents
request.
Более длинный ответ состоит в том, что вы не можете использовать эту технику, если у вас есть несколько производителей, потому что производитель не имеет доступа к ответу на запрос другого производителя.Вместо этого вы должны вызывать DescribeLogStreams перед каждым запросом.Следующий код извлечен из среды ведения журналов Java , которую я написал (включая ссылки на функции, которые здесь не показаны, и могут содержать синтаксические ошибки, поскольку я исключил материал, специфичный для библиотеки журналов):
/**
* This function retrieves the current information for a specific log stream.
* DescribeLogStreams is a paginated operation, which means that we have to
* be prepared for a large number of rows in the response, but since we're
* passing the full stream name as a prefix this should never happen.
*/
private LogStream findLogStream(String logGroupName, String logStreamName)
{
DescribeLogStreamsRequest request = new DescribeLogStreamsRequest()
.withLogGroupName(logGroupName)
.withLogStreamNamePrefix(logStreamName);
DescribeLogStreamsResult result;
do
{
result = client.describeLogStreams(request);
for (LogStream stream : result.getLogStreams())
{
if (stream.getLogStreamName().equals(logStreamName))
return stream;
}
request.setNextToken(result.getNextToken());
} while (result.getNextToken() != null);
return null;
}
/**
* This function tries to send a batch of messages, retrieving the sequence
* number for each batch and handling the data race if another process has
* made that sequence number invalid.
*/
private List<LogMessage> attemptToSend(List<LogMessage> batch)
{
if (batch.isEmpty())
return batch;
PutLogEventsRequest request = new PutLogEventsRequest()
.withLogGroupName(config.logGroupName)
.withLogStreamName(config.logStreamName)
.withLogEvents(constructLogEvents(batch));
for (int ii = 0 ; ii < 5 ; ii++)
{
LogStream stream = findLogStream();
try
{
request.setSequenceToken(stream.getUploadSequenceToken());
client.putLogEvents(request);
return Collections.emptyList();
}
catch (InvalidSequenceTokenException ex)
{
stats.updateWriterRaceRetries();
Utils.sleepQuietly(100);
// continue retry loop
}
catch (DataAlreadyAcceptedException ex)
{
reportError("received DataAlreadyAcceptedException, dropping batch", ex);
return Collections.emptyList();
}
catch (Exception ex)
{
reportError("failed to send batch", ex);
return batch;
}
}
reportError("received repeated InvalidSequenceTokenException responses -- increase batch delay?", null);
stats.updateUnrecoveredWriterRaceRetries();
return batch;
}
Большинство исключений, которые вы получите от запроса PutLogEvents
, невозможно восстановить, поэтому этот код их игнорирует.InvalidSequenceTokenException
, однако, указывает, что между двумя производителями произошла гонка, и другой производитель смог записать пакет между временем, когда этот производитель получил описание потока и попытался написать его пакет.Это маловероятно, но возможно, поэтому он делает несколько повторных попыток, а затем отклоняет пакет (он помещается в очередь для другой попытки).
Есть еще один последний ответ, который может быть важным для вас: CloudWatch имеет правила о временных метках событий в пакете (не слишком далеко в прошлом или будущем).Если ваш пакет содержит события, выходящие за пределы этого диапазона, они будут удалены, но остальные события будут добавлены в поток.Вы можете увидеть, происходит ли это, посмотрев на объект rejectedLogEventsInfo
в ответе, который будет иметь ненулевые индексы, если какие-либо записи были отброшены (для каркаса ведения журнала это вряд ли произойдет, и для него нет исправлений, поэтомуЯ просто игнорирую это значение ответа).