Я пытаюсь создать простого производителя Kafka на основе confluent_kafka .Мой код следующий:
#!/usr/bin/env python
from confluent_kafka import Producer
import json
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
see https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md"""
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
class MySource:
"""Kafka producer"""
def __init__(self, kafka_hosts, topic):
"""
:kafka_host list(str): hostnames or 'host:port' of Kafka
:topic str: topic to produce messages to
"""
self.topic = topic
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config = {
'metadata.broker.list': ','.join(kafka_hosts),
'group.id': 'mygroup',
}
self.producer = Producer(config)
@staticmethod
def main():
topic = 'my-topic'
message = json.dumps({
'measurement': [1, 2, 3]})
mys = MySource(['kafka'], topic)
mys.producer.produce(
topic, message, on_delivery=delivery_report)
mys.producer.flush()
if __name__ == "__main__":
MySource.main()
При первом использовании темы (здесь: «my-topic») Кафка реагирует на «Автоматическое создание темы my-topic с 1 разделом и коэффициентом репликации».1 успешно (kafka.server.KafkaApis) ".Тем не менее, функция обратного вызова (on_delivery=delivery_report
) никогда не вызывается, и она висит на flush()
(она завершается, если я устанавливаю тайм-аут для сброса) ни в первый, ни в последующий раз.Логи Кафки ничего не показывают, если я использую существующую тему.