Как потреблять данные от нескольких брокеров Кафки в Python? - PullRequest
0 голосов
/ 20 мая 2019

Я создал тему с помощью этой команды

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic replica

Затем я запускаю эту команду

bin/kafka-console-producer.sh --broker-list localhost:9093 localhost:9094 --topic replica

, и я могу получить сообщение для потребителя с помощью этой команды

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic replica

Но я не могу сделать то же самое в Python

В Python я устанавливаю это на стороне производителя

producer = KafkaProducer(bootstrap_servers=['localhost:9093', 'localhost:9094'],
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))

producer.send('replica', value=data_obj)

, а на стороне потребителя я устанавливаю это

from pprint import pprint
def subscriber(topic):

    consumer = KafkaConsumer(
    topic,
     bootstrap_servers=['localhost:9093', 'localhost:9094'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

    for msg in consumer:

        pprint(msg)


if __name__ == '__main__':


   subscriber('replica')

В чем проблема?Почему я не могу использовать данные?

1 Ответ

0 голосов
/ 21 мая 2019

Также добавьте localhost: 9092. Это ваш сервер начальной загрузки и попробуйте заменить --replication-factor 3 следующим образом

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic replica
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...