Я довольно новичок в Python и начинаю работать с Kafka.Итак, я настроил брокера Kafka и пытаюсь установить с ним связь, используя confluent-kafka .Я был в состоянии создавать и потреблять простые сообщения, используя его, однако у меня есть некоторые объекты django, которые мне нужно сериализовать и отправить в ti kafka.
Ранее я использовал kafka-python , на котором я был в состоянии отправлять и принимать сообщения JSON, однако у меня были некоторые странные проблемы это.
# Producer.py
def send_message(topic,message) :
try :
try :
p.produce(topic,message,callback=delivery_callback)
except BufferError as b :
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
except Exception as e :
import traceback
print(traceback.format_exc())
# Consumer.py
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except Exception as e:
import traceback
print(traceback.format_exc())
finally:
c.close()
Я сериализую свои объекты модели django следующим образом:
from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])
Итак, какие изменения мне нужно внести в моего производителя и потребителя, чтобы произвестии cosume json сообщения?