Потребитель Python kafka не будет потреблять сообщения от производителя - PullRequest
0 голосов
/ 07 февраля 2019

так что я довольно новичок в Кафке.Я пытался запустить простого потребителя и производителя Kafka.когда я запускаю мой потребитель, он печатает привет прямо перед циклом for.Но ничего не печатается в цикле for, что наводит меня на мысль, что оно никогда не входит в цикл for, и потребитель не принимает сообщения от производителя.Я запускаю это в системе Linux.

Может кто-нибудь дать совет, что может быть не так с производителем или потребителем?Я показал мой код производителя и потребителя, которые представляют собой всего несколько строк кода.

Это мой производитель:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')

Это мой потребитель:

from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
 bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
 group_id=None,
 enable_auto_commit=False,
 auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
 print(message)

Так что при запуске моего продюсера это в конечном итоге выдает ошибку. Никто не знает, что это значит, и если это возможно, что не так.

File "producer.py", line 3, in <module>
    producer.send('MyFirstTopic1', 'Hello, World!')
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

1 Ответ

0 голосов
/ 07 февраля 2019

Похоже, вы используете неправильный хост в ваших клиентских конфигурациях.localhost:2181 обычно является сервером Zookeeper.

Чтобы ваши клиенты работали, вам нужно вместо bootstrap_servers указать имя хоста и порт брокера Kafka.Это localhost:9092 по умолчанию.

См. https://kafka -python.readthedocs.io / en / latest / apidoc / KafkaProducer.html

...