Что происходит с данными Kinesis, когда нет привязанных потребителей? - PullRequest
0 голосов
/ 07 апреля 2020

Я установил новый поток данных Kinesis и записал в него некоторые данные. Потом я понял, что еще не настроил Kinesis Firehose, поэтому я тоже это настроил. Я ожидал, что Firehose соберет данные, которые были записаны в поток ранее, и выгрузит их в S3, но обнаружил, что Firehose только сбрасывал данные, которые были записаны в поток после его присоединения.

Что происходит с данными что записывается в поток данных Kinesis, когда нет подключенных потребителей? Для потока настроен период хранения 24 часа, поэтому я предполагаю, что данные все еще где-то там (по крайней мере, на данный момент). Это все еще там, и есть ли какой-нибудь способ для меня добраться до этого?

1 Ответ

0 голосов
/ 07 апреля 2020

Да, данные остаются в потоке Kinesis до истечения срока хранения.

К сожалению, они недоступны с Kinesis Firehose. Когда Firehose инициализирует свои итераторы сегментов, он использует режим LATEST, чтобы начать сбор данных в тот момент, когда он начинает работать. Кажется, нет никакого способа настроить Firehose для получения записей, которые были отправлены в Kinesis с более ранних версий.

Однако! Полностью возможно извлечь данные из потока Kinesis, используя любой процесс, который может инициализировать итератор сегмента в режиме TRIM_HORIZON или AT_TIMESTAMP. Это можно сделать с помощью awscli, но в итоге я написал сценарий Python с помощью boto3 для сбора данных из сегмента. Выглядело это примерно так.

response = client.get_shard_iterator(
    StreamName=StreamName,
    ShardId=ShardId,
    ShardIteratorType='AT_TIMESTAMP',
    Timestamp=datetime(2020, 4, 7, 1, 0, 0),
)

while True:
    response = client.get_records(
        ShardIterator=ShardIterator,
        Limit=150
    )
    if response['MillisBehindLatest'] == 0:
        break
    for record in response['Records']:
        sys.stdout.buffer.write(record['Data'])
    ShardIterator = response['NextShardIterator']

Также будьте осторожны: если итератор запускается в то время, когда нет доступных данных, get_records() может возвращать пустой набор нулевых записей [] для многих итераций до тех пор, пока Итератор догоняет время, когда есть данные. Это случалось со мной каждый раз, когда я использовал TRIM_HORIZON, и поэтому я переключился на использование AT_TIMESTAMP, чтобы я мог перейти туда, где начинались данные.

...