Я создал тему с помощью этой команды
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic replica
Затем я запускаю эту команду
bin/kafka-console-producer.sh --broker-list localhost:9093 localhost:9094 --topic replica
, и я могу получить сообщение для потребителя с помощью этой команды
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic replica
Но я не могу сделать то же самое в Python
В Python я устанавливаю это на стороне производителя
producer = KafkaProducer(bootstrap_servers=['localhost:9093', 'localhost:9094'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
producer.send('replica', value=data_obj)
, а на стороне потребителя я устанавливаю это
from pprint import pprint
def subscriber(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9093', 'localhost:9094'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for msg in consumer:
pprint(msg)
if __name__ == '__main__':
subscriber('replica')
В чем проблема?Почему я не могу использовать данные?