так что я довольно новичок в Кафке.Я пытался запустить простого потребителя и производителя Kafka.когда я запускаю мой потребитель, он печатает привет прямо перед циклом for.Но ничего не печатается в цикле for, что наводит меня на мысль, что оно никогда не входит в цикл for, и потребитель не принимает сообщения от производителя.Я запускаю это в системе Linux.
Может кто-нибудь дать совет, что может быть не так с производителем или потребителем?Я показал мой код производителя и потребителя, которые представляют собой всего несколько строк кода.
Это мой производитель:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')
Это мой потребитель:
from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
group_id=None,
enable_auto_commit=False,
auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
print(message)
Так что при запуске моего продюсера это в конечном итоге выдает ошибку. Никто не знает, что это значит, и если это возможно, что не так.
File "producer.py", line 3, in <module>
producer.send('MyFirstTopic1', 'Hello, World!')
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.