Я новичок в Кафке. Я пытался запустить пример кода, указанного в https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py
.
В соответствии с кодом, как работает производитель и потребитель:
def produce(topic, conf):
"""
Produce User records
"""
from confluent_kafka.avro import AvroProducer
producer = AvroProducer(conf, default_value_schema=record_schema)
print("Producing user records to topic {}. ^c to exit.".format(topic))
while True:
# Instantiate new User, populate fields, produce record, execute callbacks.
record = User()
try:
record.name = input("Enter name: ")
record.favorite_number = int(input("Enter favorite number: "))
record.favorite_color = input("Enter favorite color: ")
# The message passed to the delivery callback will already be serialized.
# To aid in debugging we provide the original object to the delivery callback.
producer.produce(topic=topic, value=record.to_dict(),
callback=lambda err, msg, obj=record: on_delivery(err, msg, obj))
# Serve on_delivery callbacks from previous asynchronous produce()
producer.poll(0)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
print("\nFlushing records...")
producer.flush()
def consume(topic, conf):
"""
Consume User records
"""
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
print("Consuming user records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
c = AvroConsumer(conf, reader_value_schema=record_schema)
c.subscribe([topic])
while True:
try:
msg = c.poll(1)
# There were no messages on the queue, continue polling
if msg is None:
print("There are no messages in the queue.")
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
record = User(msg.value())
print("name: {}\n\tfavorite_number: {}\n\tfavorite_color: {}\n".format(
record.name, record.favorite_number, record.favorite_color))
except SerializerError as e:
# Report malformed record, discard results, continue polling
print("Message deserialization failed {}".format(e))
continue
except KeyboardInterrupt:
break
print("Shutting down consumer..")
c.close()
Когда я пытаюсь использовать сообщение, очередь кажется пустой.
Я получаю следующий вывод:
Consuming user records from topic example_avro with group example_avro. ^c to exit.
There are no messages in the queue.
Это привело меня к подозрению, что я не произвел сообщение в первую очередь. Я использую те же bootstrap серверы, схему реестра и топи c. Может ли кто-нибудь помочь мне понять, что я не правильно понимаю?