У меня проблемы с поддержанием работы Kinesis Stream Consumer с использованием библиотеки Boto 3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#client). Короче говоря, у меня есть производитель, который анализирует исходный файл каждые 15 минут, фильтрует соответствующие записи и отправляет их в поток Kinesis. .
Вот урезанная версия моего потребителя (вдохновленная следующим постом - https://www.arundhaj.com/blog/getting-started-kinesis-python.html)...
my_stream_name = ‘project_stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
while True:
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
try:
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
Limit=1)
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
Limit=1)
if len(record_response["Records"]) > 0:
#Do stuff with record
time.sleep(1)
else:
time.sleep(30)
except ClientError as e:
time.sleep(300)
Часть моей проблемы заключается в том, что пока нет данных, которые передаются в течение длительного времени (не менее 10 минут). И кажется, что ShardIterator сохраняется 5 минут максимум (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html).
Я пытался справиться с этим, фиксируя ошибки, заставляя скрипт работать в течение 5 минут, перехватывая другого итератора и запуская заново. И это похоже на работу. Но только на короткое время (например, от 24 до 48 часов). Затем скрипт завершается без ошибок.
2 вопроса:
- Есть ли лучший способ обработки потребителя Always On, который прослушивает поток, который не всегда активен?
- Должен ли я просто избегать использования типа LATEST ShardIterator и использовать решение, которое будет запускать скрипт каждые 15 минут и использовать тип AT_TIMESTAMP типа ShardIterator (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)?
Мне кажется, что вариант 2, вероятно, является лучшим в этом сценарии, но я бы хотел иметь потребителя Always On, поскольку я планирую в будущем отправить больше данных в этот поток и затем потребуется перенастроить моего потребителя.
Идеи? Очевидные недостатки моей логики? Спасибо всем за помощь!