Какую конфигурацию итератора Kinesis Stream Shard использовать для редких данных - PullRequest
0 голосов
/ 12 марта 2019

У меня проблемы с поддержанием работы Kinesis Stream Consumer с использованием библиотеки Boto 3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#client). Короче говоря, у меня есть производитель, который анализирует исходный файл каждые 15 минут, фильтрует соответствующие записи и отправляет их в поток Kinesis. .

Вот урезанная версия моего потребителя (вдохновленная следующим постом - https://www.arundhaj.com/blog/getting-started-kinesis-python.html)...

my_stream_name = ‘project_stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']

while True:
    shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                       ShardId=my_shard_id,
                                                       ShardIteratorType='LATEST')
    my_shard_iterator = shard_iterator['ShardIterator']

    try:
        record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
                                                      Limit=1)
        while 'NextShardIterator' in record_response:
            record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
                                                          Limit=1)
            if len(record_response["Records"]) > 0:
                #Do stuff with record
                time.sleep(1)
            else:
                time.sleep(30)

    except ClientError as e:
        time.sleep(300)

Часть моей проблемы заключается в том, что пока нет данных, которые передаются в течение длительного времени (не менее 10 минут). И кажется, что ShardIterator сохраняется 5 минут максимум (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html).

Я пытался справиться с этим, фиксируя ошибки, заставляя скрипт работать в течение 5 минут, перехватывая другого итератора и запуская заново. И это похоже на работу. Но только на короткое время (например, от 24 до 48 часов). Затем скрипт завершается без ошибок.

2 вопроса:

  1. Есть ли лучший способ обработки потребителя Always On, который прослушивает поток, который не всегда активен?
  2. Должен ли я просто избегать использования типа LATEST ShardIterator и использовать решение, которое будет запускать скрипт каждые 15 минут и использовать тип AT_TIMESTAMP типа ShardIterator (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)?

Мне кажется, что вариант 2, вероятно, является лучшим в этом сценарии, но я бы хотел иметь потребителя Always On, поскольку я планирую в будущем отправить больше данных в этот поток и затем потребуется перенастроить моего потребителя.

Идеи? Очевидные недостатки моей логики? Спасибо всем за помощь!

...