У меня есть приложение Django, которое имеет интеграцию с Kafka для обработки некоторых заказов.Темы в очереди Kafka создаются динамически, поэтому пользователи также должны подписываться динамически.Теперь, когда я инициализирую потребителя, он блокирует основной поток, поэтому мне нужно запустить потребителя в фоновом потоке, но я не могу видеть какие-либо операторы печати, поэтому я не уверен, инициализирован ли потребитель, также если этоправильный подход для этого?
def kafka_consumer(topic) :
try :
if topic is None :
raise Exception("Topic is none, unable to initialize kafka consumer")
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
print("Subscribing consumer to topic ",topic[0])
c.subscribe(topic)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
try :
print(json.loads(msg.value()))
print("---------------------------------")
objs = serializers.deserialize("json", msg.value())
for obj in objs :
print(obj)
print(obj.object)
except Exception as e :
import traceback
print(traceback.format_exc())
except Exception as e:
import traceback
print(traceback.format_exc())
finally:
c.close()
except Exception as e:
import traceback
print(traceback.format_exc())
Ниже описано, как я вызываю функцию:
try :
topic = []
topic.append(offer.offering_order_id)
background_thread = Thread(target=kafka_consumer, args=(topic))
background_thread.start()
except Exception as e :
import traceback
print(traceback.format_exc())
Может кто-нибудь помочь мне с архитектурой, пожалуйста?