Что такое nextSequenceToken для API PutLogEvents для AWS.CloudWatchLogs - PullRequest
0 голосов
/ 09 июля 2019

Что такое nextSequenceToken, упомянутое здесь?

https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html

Непонятно мне и, вероятно, другим.

1 Ответ

1 голос
/ 09 июля 2019

Каждая партия событий, отправляемых в журналы CloudWatch, должна содержать токен последовательности (из документа PutLogEvents API ):

{
   "logEvents": [ 
      { 
         "message": "string",
         "timestamp": number
      }
   ],
   "logGroupName": "string",
   "logStreamName": "string",
   "sequenceToken": "string"
}

Ответ возвращает nextSequenceToken (снова из APIдокументы):

{
   "nextSequenceToken": "string",
   "rejectedLogEventsInfo": { 
      "expiredLogEventEndIndex": number,
      "tooNewLogEventStartIndex": number,
      "tooOldLogEventEndIndex": number
   }
}

Итак, краткий ответ на ваш вопрос: если у вас есть один продюсер, пишущий в поток, вы можете сохранить nextSequenceToken и использовать его для заполнения sequenceToken для следующего PutLogEvents request.

Более длинный ответ состоит в том, что вы не можете использовать эту технику, если у вас есть несколько производителей, потому что производитель не имеет доступа к ответу на запрос другого производителя.Вместо этого вы должны вызывать DescribeLogStreams перед каждым запросом.Следующий код извлечен из среды ведения журналов Java , которую я написал (включая ссылки на функции, которые здесь не показаны, и могут содержать синтаксические ошибки, поскольку я исключил материал, специфичный для библиотеки журналов):

/**
 *  This function retrieves the current information for a specific log stream.
 *  DescribeLogStreams is a paginated operation, which means that we have to
 *  be prepared for a large number of rows in the response, but since we're
 *  passing the full stream name as a prefix this should never happen.
 */
private LogStream findLogStream(String logGroupName, String logStreamName)
{
    DescribeLogStreamsRequest request = new DescribeLogStreamsRequest()
                                        .withLogGroupName(logGroupName)
                                        .withLogStreamNamePrefix(logStreamName);
    DescribeLogStreamsResult result;
    do
    {
        result = client.describeLogStreams(request);
        for (LogStream stream : result.getLogStreams())
        {
            if (stream.getLogStreamName().equals(logStreamName))
                return stream;
        }
        request.setNextToken(result.getNextToken());
    } while (result.getNextToken() != null);
    return null;
}

/**
 *  This function tries to send a batch of messages, retrieving the sequence
 *  number for each batch and handling the data race if another process has
 *  made that sequence number invalid.
 */
private List<LogMessage> attemptToSend(List<LogMessage> batch)
{
    if (batch.isEmpty())
        return batch;

    PutLogEventsRequest request = new PutLogEventsRequest()
                                  .withLogGroupName(config.logGroupName)
                                  .withLogStreamName(config.logStreamName)
                                  .withLogEvents(constructLogEvents(batch));

    for (int ii = 0 ; ii < 5 ; ii++)
    {
        LogStream stream = findLogStream();

        try
        {
            request.setSequenceToken(stream.getUploadSequenceToken());
            client.putLogEvents(request);
            return Collections.emptyList();
        }
        catch (InvalidSequenceTokenException ex)
        {
            stats.updateWriterRaceRetries();
            Utils.sleepQuietly(100);
            // continue retry loop
        }
        catch (DataAlreadyAcceptedException ex)
        {
            reportError("received DataAlreadyAcceptedException, dropping batch", ex);
            return Collections.emptyList();
        }
        catch (Exception ex)
        {
            reportError("failed to send batch", ex);
            return batch;
        }
    }

    reportError("received repeated InvalidSequenceTokenException responses -- increase batch delay?", null);
    stats.updateUnrecoveredWriterRaceRetries();
    return batch;
}

Большинство исключений, которые вы получите от запроса PutLogEvents, невозможно восстановить, поэтому этот код их игнорирует.InvalidSequenceTokenException, однако, указывает, что между двумя производителями произошла гонка, и другой производитель смог записать пакет между временем, когда этот производитель получил описание потока и попытался написать его пакет.Это маловероятно, но возможно, поэтому он делает несколько повторных попыток, а затем отклоняет пакет (он помещается в очередь для другой попытки).

Есть еще один последний ответ, который может быть важным для вас: CloudWatch имеет правила о временных метках событий в пакете (не слишком далеко в прошлом или будущем).Если ваш пакет содержит события, выходящие за пределы этого диапазона, они будут удалены, но остальные события будут добавлены в поток.Вы можете увидеть, происходит ли это, посмотрев на объект rejectedLogEventsInfo в ответе, который будет иметь ненулевые индексы, если какие-либо записи были отброшены (для каркаса ведения журнала это вряд ли произойдет, и для него нет исправлений, поэтомуЯ просто игнорирую это значение ответа).

...