Невозможно записать сообщение в тему kafka (продюсер) при вызове kakfa производить класс с циклом.
Я очень новичок в Python и Kafka. Я пытаюсь написать программу на python для записи сообщений в тему Kafka и создания так, чтобы потребитель Kafka мог подписаться на эту тему, чтобы опубликовать сообщение.
Я не уверен, что отсутствует в моей программе, которая запрещает писать сообщения в теме.
Обратите внимание: я читаю файл JSON и использую цикл for, чтобы подготовить значение ключа. Затем присвойте его переменной и передайте эту переменную с помощью Kafka для arg для msg.
Прилагается программа производителя Kafka.
Ввод: Json_smpl.json
Содержимое файла:
{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}
Программа:
from confluent_kafka import Producer
import json
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}"
.format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
with open('json_smpl.json') as read_j:
data = json.load(read_j)
get_data = data.get("transactions")
print(get_data)
for i in get_data:
a = list(get_data.items()[0])
p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
pass
p.flush(1)
Ожидаемый результат: Сообщение (ключ и значение JSON) будет записано в тему kafka для каждой итерации в цикле.
Фактический результат: Нет сообщений в теме. таким образом, потребитель не получает никаких сообщений.