Да, данные остаются в потоке Kinesis до истечения срока хранения.
К сожалению, они недоступны с Kinesis Firehose. Когда Firehose инициализирует свои итераторы сегментов, он использует режим LATEST
, чтобы начать сбор данных в тот момент, когда он начинает работать. Кажется, нет никакого способа настроить Firehose для получения записей, которые были отправлены в Kinesis с более ранних версий.
Однако! Полностью возможно извлечь данные из потока Kinesis, используя любой процесс, который может инициализировать итератор сегмента в режиме TRIM_HORIZON
или AT_TIMESTAMP
. Это можно сделать с помощью awscli
, но в итоге я написал сценарий Python с помощью boto3 для сбора данных из сегмента. Выглядело это примерно так.
response = client.get_shard_iterator(
StreamName=StreamName,
ShardId=ShardId,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime(2020, 4, 7, 1, 0, 0),
)
while True:
response = client.get_records(
ShardIterator=ShardIterator,
Limit=150
)
if response['MillisBehindLatest'] == 0:
break
for record in response['Records']:
sys.stdout.buffer.write(record['Data'])
ShardIterator = response['NextShardIterator']
Также будьте осторожны: если итератор запускается в то время, когда нет доступных данных, get_records()
может возвращать пустой набор нулевых записей []
для многих итераций до тех пор, пока Итератор догоняет время, когда есть данные. Это случалось со мной каждый раз, когда я использовал TRIM_HORIZON
, и поэтому я переключился на использование AT_TIMESTAMP
, чтобы я мог перейти туда, где начинались данные.