получать сообщения в топи c, используя kafka- python - PullRequest
0 голосов
/ 01 марта 2020

Я написал скрипт python с использованием библиотеки kafka-python, которая записывает и читает сообщения в kafka. Я пишу сообщения без проблем; Я могу получить их, используя kafka консольные инструменты. Но я не могу прочитать их, используя мой python скрипт. У меня есть потребитель для моего потребителя, который останавливается в первой строке итерации и никогда не возвращается. Вот мой код:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))

Потребитель создан и подписан полностью; Я вижу, что my-topic указан в списке topi c его свойства _client.

Есть идеи?

1 Ответ

1 голос
/ 01 марта 2020

По умолчанию kafka python начинается с последнего смещения, ie будет читать только новые сообщения. Один из подходов заключается в чтении с самого начала, или альтернативный подход заключается в продолжении опроса topi c в бесконечном l oop, как показано в следующем коде:

while True:
    try:
        records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min

        record_list = []
        for tp, consumer_records in records.items():
            for consumer_record in consumer_records:
                record_list.append(consumer_record.value)
        print(record_list) # record_list will be list of dictionaries

Edit

Для чтения с самого начала нам нужно добавить auto_offset_reset=earliest ранее при создании объекта-потребителя

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
    auto_offset_reset='earliest')

Дайте мне знать, если это поможет !!

...