Пытаюсь создать сообщение на тему kafka для каждой итерации, но похоже, что в итоге я не отправляю сообщение пользователю - PullRequest
0 голосов
/ 11 января 2019

Невозможно записать сообщение в тему kafka (продюсер) при вызове kakfa производить класс с циклом.

Я очень новичок в Python и Kafka. Я пытаюсь написать программу на python для записи сообщений в тему Kafka и создания так, чтобы потребитель Kafka мог подписаться на эту тему, чтобы опубликовать сообщение.

Я не уверен, что отсутствует в моей программе, которая запрещает писать сообщения в теме.

Обратите внимание: я читаю файл JSON и использую цикл for, чтобы подготовить значение ключа. Затем присвойте его переменной и передайте эту переменную с помощью Kafka для arg для msg.

Прилагается программа производителя Kafka.

Ввод: Json_smpl.json

Содержимое файла:

{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}

Программа:

from confluent_kafka import Producer
import json

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j)
        get_data = data.get("transactions")
    print(get_data)
    for i in get_data:
        a = list(get_data.items()[0])
        p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
    pass
p.flush(1)

Ожидаемый результат: Сообщение (ключ и значение JSON) будет записано в тему kafka для каждой итерации в цикле.

Фактический результат: Нет сообщений в теме. таким образом, потребитель не получает никаких сообщений.

1 Ответ

0 голосов
/ 12 января 2019

В вашем файле нет ключа transactions и нет цикла, который нужно перебрать, поэтому ваш JSON не анализируется, и вы не перехватываете KeyError или ValueError

Начните с этого

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j).get("transaction")
        tosend = json.dumps(data)
        print("Ready to send : {}".format(tosend))
        p.produce(topic='mytopic12', tosend, callback=acked)
except:
    print("There was some error")
...