Я пытаюсь получить / распечатать / прочитать все данные (в формате json) из AWS Kinesis Steam, используя приведенный ниже лямбда-код функции.но я могу получить только 20 записей в качестве выходных данных (ожидаемые записи 60 * 100) после запуска в течение 1 минуты (настроенное время ожидания в лямбда-диапазоне до 1 минуты).Загружаю данные в потоки со скоростью 100 записей в секунду, используя генератор данных Kinesis.Я настроил поток кинезиса с двумя открытыми осколками и сроком хранения данных 24 часа.
from __future__ import print_function
import boto3
from datetime import datetime
import time
def lambda_handler(event, context):
my_stream_name = 'Consumption_Stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='TRIM_HORIZON')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=100)
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=100)
if(record_response['Records']):
#print (record_response)
print (record_response['Records'][0]['Data'])
paylod = base64.b64decode(record_response['Records'][0]['Data'])
print("Decoded payload: " + payload)
# wait for 1 seconds before looping back around to see if there is any more data to read
time.sleep(1)