Невозможно создать какие-либо сообщения Кафки - PullRequest
0 голосов
/ 17 апреля 2020

Я новичок в Кафке. Я пытался запустить пример кода, указанного в https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py.

В соответствии с кодом, как работает производитель и потребитель:

def produce(topic, conf):
    """
        Produce User records
    """

    from confluent_kafka.avro import AvroProducer

    producer = AvroProducer(conf, default_value_schema=record_schema)

    print("Producing user records to topic {}. ^c to exit.".format(topic))
    while True:
        # Instantiate new User, populate fields, produce record, execute callbacks.
        record = User()
        try:
            record.name = input("Enter name: ")
            record.favorite_number = int(input("Enter favorite number: "))
            record.favorite_color = input("Enter favorite color: ")

            # The message passed to the delivery callback will already be serialized.
            # To aid in debugging we provide the original object to the delivery callback.
            producer.produce(topic=topic, value=record.to_dict(),
                             callback=lambda err, msg, obj=record: on_delivery(err, msg, obj))
            # Serve on_delivery callbacks from previous asynchronous produce()
            producer.poll(0)
        except KeyboardInterrupt:
            break
        except ValueError:
            print("Invalid input, discarding record...")
            continue

    print("\nFlushing records...")
    producer.flush()

def consume(topic, conf):
    """
        Consume User records
    """
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError

    print("Consuming user records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))

    c = AvroConsumer(conf, reader_value_schema=record_schema)
    c.subscribe([topic])

    while True:
        try:
            msg = c.poll(1)

            # There were no messages on the queue, continue polling
            if msg is None:
                print("There are no messages in the queue.")
                continue

            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue

            record = User(msg.value())
            print("name: {}\n\tfavorite_number: {}\n\tfavorite_color: {}\n".format(
                record.name, record.favorite_number, record.favorite_color))
        except SerializerError as e:
            # Report malformed record, discard results, continue polling
            print("Message deserialization failed {}".format(e))
            continue
        except KeyboardInterrupt:
            break

    print("Shutting down consumer..")
    c.close()

Когда я пытаюсь использовать сообщение, очередь кажется пустой.

Я получаю следующий вывод:

Consuming user records from topic example_avro with group example_avro. ^c to exit.
There are no messages in the queue.

Это привело меня к подозрению, что я не произвел сообщение в первую очередь. Я использую те же bootstrap серверы, схему реестра и топи c. Может ли кто-нибудь помочь мне понять, что я не правильно понимаю?

...