Кафка проверяет тему на сообщения - PullRequest
0 голосов
/ 18 марта 2019

У меня есть приложение на Python, которое читает из 2 тем. Один из них похож на пакетный процесс (всегда запущен), а другой - на специальный.

Я хочу, чтобы приложение выполняло пакетный процесс, но проверил специальное решение после обработки нескольких записей (это цель). И если в специальной теме есть элементы, то сначала выполните это, а затем вернитесь к пакетному процессу

Во всяком случае, это то, с чего я начал:

while True:
    for msg_1 in consumer_1:
        print('consumer_1 {0}'.format(msg_1.value)

        for msg_2 in consumer_2:
            print('consumer_2 {0}'.format(msg_2.value)

Конечно, это не работает. Он застревает во втором цикле for.

Я прочитал этот пост и попытался его реализовать ( Kafka Consumer - тема (ы) с более высоким приоритетом ). Ответ был дан в 2017 году, и я думаю, что с тех пор в API произошли изменения.

while True:
    for msg_1 in consumer_1:
        print('consumer_1 {0}'.format(msg_1.value)

        client = SimpleClient('localhost:9092')
        topic_partitions_ids = client.get_partition_ids_for_topic(b'two')
        end_offsets = consumer_2.end_offsets(topic_partitons_ids)
        committed = consumer_2.committed(topic_partitions_ids)

        if end_offsets - committed > 0:
        for msg_2 in consumer_2:
            consumer_1.pause()
            print('consumer_2 {0}'.format(msg_2.value)
            consumer_1.resume()

Но я застрял на линии

committed = consumer_2.committed(topic_partitions_ids)

с сообщением об ошибке:

раздел должен быть разделом с именем TopicPartition

Я верю, что есть простой способ сделать это, но для моей жизни я не могу найти выход. И я также думаю, что мне вообще не нужно останавливать customer_1.

Может кто-нибудь помочь?

Просто обратите внимание, это то, как я создал consumer_1 и consumer_2, на случай, если это уместно.

consumer_1 = KafkaConsumer('one',
                            group_id='one-group',
                            bootstrap_servers=['localhost:9092'],
                            value_deserializer=lambda x: loads(x.decode('utf-8')))

consumer_2 = KafkaConsumer('two',
                           group_id='two-group',
                           bootstrap_servers=['localhost:9092'],
                           value_deserializer=lambda x: loads(x.decode('utf-8')))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...