Простой производитель Кафки, не отправляющий сообщения и не выдающий ошибку - PullRequest
1 голос
/ 28 июня 2019

Я пишу простой продюсер, я просто хочу отправить необработанные данные в тему. по какой-то причине мне нужно указать сериализатор, который преобразует сообщение в json, затем в utf-8, а затем отправляет сообщение json ..

этот код не работает (без ошибок, но нечего использовать в теме)

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = "my_new_topic5"

producer.send(topic, b'test message')

этот код работает

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                     value_serializer=lambda x:
                     dumps(x).encode('utf-8'))

for e in range(2):
    data = {'number': e}
    producer.send('numtest', value=data)
    sleep(5)

1 Ответ

1 голос
/ 28 июня 2019

Попробуйте также позвонить producer.flush() после send() и producer.close() до завершения вашей программы.Следующее должно сделать трюк:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = "my_new_topic5"

producer.send(topic, b'test message')
producer.flush()
producer.close()
...