У меня есть сценарий python, написанный на Boto-V2.x , работающий на экземпляре EC2, и он создает один рабочий поток.Поток извлекает данные из Kinesis, используя тип итератора LATEST .Я хочу, чтобы в одном и том же шарде было несколько потоков с Тип итератора: AFTER_SEQUENCE_NUMBER .Я читал о Возможность параллельного чтения с использованием KCL (Библиотека Kinesis Consumer) , но я не хочу изобретать колесо в KCL, поскольку я могу получить два потока, работающих на одном и том же шарде с Тип итератора: LATEST .Теперь я хочу, чтобы эти потоки обрабатывали осколок с помощью Типа итератора: AFTER_SEQUENCE_NUMBER и со следующим правилом:
Рабочий поток 1 и Рабочий поток 2 должен обрабатывать записи с разными порядковыми номерами на одном и том же осколке.
Ниже приведен код рабочего класса с итератором Тип: LATEST :
class KinesisWorker(threading.Thread):
"""The Worker thread that repeatedly gets records from a given Kinesis
stream."""
def __init__(self, stream_name, shard_id, iterator_type,
worker_time=40, sleep_interval=1,
name=None, group=None, echo=False, args=(), kwargs={}):
super(KinesisWorker, self).__init__(name=name, group=group,
args=args, kwargs=kwargs)
self.stream_name = stream_name
self.shard_id = str(shard_id)
self.iterator_type = iterator_type
self.worker_time = worker_time
self.sleep_interval = sleep_interval
self.total_records = 0
self.echo = echo
def run(self):
my_name = threading.current_thread().name
print ('+ KinesisWorker:', my_name)
print ('+-> working with iterator:', self.iterator_type)
response = kinesis.get_shard_iterator(self.stream_name,
self.shard_id, self.iterator_type)
next_iterator = response['ShardIterator']
print ('+-> getting next records using iterator:', next_iterator)
start = datetime.datetime.now()
finish = start + datetime.timedelta(seconds=self.worker_time)
while finish > datetime.datetime.now():
try:
response = kinesis.get_records(next_iterator, limit=25)
self.total_records += len(response['Records'])
if len(response['Records']) > 0:
print("------->Response", response['Records'][len(response['Records']) - 1]['SequenceNumber'])
if len(response['Records']) > 0:
print ('\n+-> {1} Got {0} Worker Records'.format(
len(response['Records']), my_name))
if self.echo:
echo_records(response['Records'])
else:
find_eggs(response['Records'])
else:
sys.stdout.write('.')
sys.stdout.flush()
next_iterator = response['NextShardIterator']
time.sleep(self.sleep_interval)
except ProvisionedThroughputExceededException as ptee:
print (ptee.message)
time.sleep(5)
Я создаю два рабочих потока вышеупомянутого класса, и я могу получать данные с двух потоков, работающих одновременноиспользуя Тип итератора: LATEST , и я также достиг получения записей с Тип итератора: AFTER_SEQUENCE_NUMBER с синглом нить.
Мой вопрос теперь заключается в том, как мне добиться того, чтобы два рабочих потока читали только те записи, которые удовлетворяют правилу потока порядкового номера, как упомянуто выше?
В настоящее время я поддерживаю начальный файл , который содержит порядковый номер, до которого записи были прочитаны при предыдущем запуске сценария.Каждый раз, когда скрипт запускается, он читает порядковый номер из начального файла , начинает обработку с использованием Тип итератора: AFTER_SEQUENCE_NUMBER и сохраняет новыйпорядковый номер в этом файле при выходе.
Ниже приведен код рабочего класса с типом итератора: AFTER_SEQUENCE_NUMBER :
class KinesisWorker(threading.Thread):
"""The Worker thread that repeatedly gets records from a given Kinesis
stream."""
def __init__(self, stream_name, shard_id, iterator_type,starting_sequence_number,
worker_time=40, sleep_interval=1,
name=None, group=None, echo=False, args=(), kwargs={}):
super(KinesisWorker, self).__init__(name=name, group=group,
args=args, kwargs=kwargs)
self.stream_name = stream_name
self.shard_id = str(shard_id)
self.iterator_type = iterator_type
self.worker_time = worker_time
self.sleep_interval = sleep_interval
self.total_records = 0
self.echo = echo
self.starting_sequence_number = starting_sequence_number
os.remove("seed")
self.f = open("seed", "w+")
def run(self):
my_name = threading.current_thread().name
print ('+ KinesisWorker:', my_name)
print ('+-> working with iterator:', self.iterator_type)
response = kinesis.get_shard_iterator(self.stream_name,
self.shard_id, shard_iterator_type=self.iterator_type, starting_sequence_number=self.starting_sequence_number)
print ('+-> getting RESPONSE using iterator:', response)
next_iterator = response['ShardIterator']
print ('+-> getting next records using iterator:', next_iterator)
start = datetime.datetime.now()
finish = start + datetime.timedelta(seconds=self.worker_time)
while finish > datetime.datetime.now():
try:
response = kinesis.get_records(next_iterator, limit=25)
if len(response['Records']) > 0:
print("------->Response", response['Records'][len(response['Records']) - 1]['SequenceNumber'])
if response['Records'][len(response['Records']) - 1]['SequenceNumber'] != None:
self.f.write(str(response['Records'][len(response['Records']) - 1]['SequenceNumber'])+"\n") #Writing to seed file
self.f.seek(0)
self.total_records += len(response['Records'])
if len(response['Records']) > 0:
print ('\n+-> {1} Got {0} Worker Records'.format(
len(response['Records']), my_name))
if self.echo:
echo_records(response['Records'])
else:
find_eggs(response['Records'])
else:
sys.stdout.write('.')
sys.stdout.flush()
next_iterator = response['NextShardIterator']
time.sleep(self.sleep_interval)
except ProvisionedThroughputExceededException as ptee:
print (ptee.message)
time.sleep(5)
Я виделДокументы, а также Связанные вопросы по StackOverflow , но речь идет об использовании KCL , который я не хочу.Пожалуйста, предложите подход, я застрял.