Несколько рабочих нитей на осколке в Kinesis - PullRequest
0 голосов
/ 01 марта 2019

У меня есть сценарий python, написанный на Boto-V2.x , работающий на экземпляре EC2, и он создает один рабочий поток.Поток извлекает данные из Kinesis, используя тип итератора LATEST .Я хочу, чтобы в одном и том же шарде было несколько потоков с Тип итератора: AFTER_SEQUENCE_NUMBER .Я читал о Возможность параллельного чтения с использованием KCL (Библиотека Kinesis Consumer) , но я не хочу изобретать колесо в KCL, поскольку я могу получить два потока, работающих на одном и том же шарде с Тип итератора: LATEST .Теперь я хочу, чтобы эти потоки обрабатывали осколок с помощью Типа итератора: AFTER_SEQUENCE_NUMBER и со следующим правилом:

Рабочий поток 1 и Рабочий поток 2 должен обрабатывать записи с разными порядковыми номерами на одном и том же осколке.

Ниже приведен код рабочего класса с итератором Тип: LATEST :

class KinesisWorker(threading.Thread):
    """The Worker thread that repeatedly gets records from a given Kinesis
    stream."""
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=40, sleep_interval=1,
                 name=None, group=None, echo=False, args=(), kwargs={}):
        super(KinesisWorker, self).__init__(name=name, group=group,
                                            args=args, kwargs=kwargs)
        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval
        self.total_records = 0
        self.echo = echo

    def run(self):
        my_name = threading.current_thread().name
        print ('+ KinesisWorker:', my_name)
        print ('+-> working with iterator:', self.iterator_type)
        response = kinesis.get_shard_iterator(self.stream_name,
                                              self.shard_id, self.iterator_type)
        next_iterator = response['ShardIterator']
        print ('+-> getting next records using iterator:', next_iterator)

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)
        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)
                self.total_records += len(response['Records'])
                if len(response['Records']) > 0:
                    print("------->Response", response['Records'][len(response['Records']) - 1]['SequenceNumber'])

                if len(response['Records']) > 0:
                    print ('\n+-> {1} Got {0} Worker Records'.format(
                        len(response['Records']), my_name))
                    if self.echo:
                        echo_records(response['Records'])
                    else:
                        find_eggs(response['Records'])
                else:
                    sys.stdout.write('.')
                    sys.stdout.flush()
                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                print (ptee.message)
                time.sleep(5)

Я создаю два рабочих потока вышеупомянутого класса, и я могу получать данные с двух потоков, работающих одновременноиспользуя Тип итератора: LATEST , и я также достиг получения записей с Тип итератора: AFTER_SEQUENCE_NUMBER с синглом нить.

Мой вопрос теперь заключается в том, как мне добиться того, чтобы два рабочих потока читали только те записи, которые удовлетворяют правилу потока порядкового номера, как упомянуто выше?

В настоящее время я поддерживаю начальный файл , который содержит порядковый номер, до которого записи были прочитаны при предыдущем запуске сценария.Каждый раз, когда скрипт запускается, он читает порядковый номер из начального файла , начинает обработку с использованием Тип итератора: AFTER_SEQUENCE_NUMBER и сохраняет новыйпорядковый номер в этом файле при выходе.

Ниже приведен код рабочего класса с типом итератора: AFTER_SEQUENCE_NUMBER :

class KinesisWorker(threading.Thread):
    """The Worker thread that repeatedly gets records from a given Kinesis
    stream."""
    def __init__(self, stream_name, shard_id, iterator_type,starting_sequence_number,
                 worker_time=40, sleep_interval=1,
                 name=None, group=None, echo=False, args=(), kwargs={}):
        super(KinesisWorker, self).__init__(name=name, group=group,
                                            args=args, kwargs=kwargs)
        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval
        self.total_records = 0
        self.echo = echo
        self.starting_sequence_number = starting_sequence_number
        os.remove("seed")
        self.f = open("seed", "w+")

    def run(self):
        my_name = threading.current_thread().name
        print ('+ KinesisWorker:', my_name)

        print ('+-> working with iterator:', self.iterator_type)
        response = kinesis.get_shard_iterator(self.stream_name,
                                              self.shard_id, shard_iterator_type=self.iterator_type, starting_sequence_number=self.starting_sequence_number)
        print ('+-> getting RESPONSE using iterator:', response)
        next_iterator = response['ShardIterator']
        print ('+-> getting next records using iterator:', next_iterator)

        start = datetime.datetime.now()

        finish = start + datetime.timedelta(seconds=self.worker_time)
        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)
                if len(response['Records']) > 0:
                    print("------->Response", response['Records'][len(response['Records']) - 1]['SequenceNumber'])
                    if response['Records'][len(response['Records']) - 1]['SequenceNumber'] != None:
                        self.f.write(str(response['Records'][len(response['Records']) - 1]['SequenceNumber'])+"\n") #Writing to seed file
                        self.f.seek(0)
                self.total_records += len(response['Records'])
                if len(response['Records']) > 0:
                    print ('\n+-> {1} Got {0} Worker Records'.format(
                        len(response['Records']), my_name))
                    if self.echo:
                        echo_records(response['Records'])
                    else:
                        find_eggs(response['Records'])
                else:
                    sys.stdout.write('.')
                    sys.stdout.flush()
                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                print (ptee.message)
                time.sleep(5)

Я виделДокументы, а также Связанные вопросы по StackOverflow , но речь идет об использовании KCL , который я не хочу.Пожалуйста, предложите подход, я застрял.

...