Должен ли Kafka Consumer быть частью Django Web Layer или отдельным сервисом? - PullRequest
0 голосов
/ 22 апреля 2019

У меня есть приложение Django, которое имеет интеграцию с Kafka для обработки некоторых заказов.Темы в очереди Kafka создаются динамически, поэтому пользователи также должны подписываться динамически.Теперь, когда я инициализирую потребителя, он блокирует основной поток, поэтому мне нужно запустить потребителя в фоновом потоке, но я не могу видеть какие-либо операторы печати, поэтому я не уверен, инициализирован ли потребитель, также если этоправильный подход для этого?

def kafka_consumer(topic) :
  try :
    if topic is None :
        raise Exception("Topic is none, unable to initialize kafka consumer")
    conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
    'auto.offset.reset': 'earliest'}
    c = Consumer(conf)
    print("Subscribing consumer to topic ",topic[0])
    c.subscribe(topic)
    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                    (msg.topic(), msg.partition(), msg.offset(),
                                    str(msg.key())))
                try :
                    print(json.loads(msg.value()))
                    print("---------------------------------")
                    objs = serializers.deserialize("json", msg.value())
                    for obj in objs :
                        print(obj)
                        print(obj.object)
                except Exception as e :
                    import traceback
                    print(traceback.format_exc())
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    finally:
        c.close()
except Exception as e:
    import traceback
    print(traceback.format_exc())

Ниже описано, как я вызываю функцию:

try :
        topic = []
        topic.append(offer.offering_order_id)
        background_thread = Thread(target=kafka_consumer, args=(topic))
        background_thread.start()
    except Exception as e :
        import traceback
        print(traceback.format_exc())

Может кто-нибудь помочь мне с архитектурой, пожалуйста?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...