Kafka Python, как отслеживать потребителя, запущенного в другом процессе - PullRequest
0 голосов
/ 08 апреля 2019

Я довольно новичок в Python и только начинаю работать с Kafka, так что извините мою терминологию, если я где-то ошибаюсь.

Итак, у меня есть веб-приложение на основе Django, куда я отправляю сообщения json черезПроизводитель Кафки в рамках одного процесса.Однако при прагматическом создании темы я также запускаю (подписываю) нового потребителя в отдельном процессе для этой конкретной темы.

#Consumer code snippet

 if topic_name is not None :
        #Create topic
        create_kafka_topic_instance(topic_name)
        #Initialize a consumer and subscribe to topic
        Process(target=init_kafka_consumer_instance, args=(topic_name))

def forgiving_json_deserializer(v):
    if v is None :
        return
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        import traceback
        print(traceback.format_exc())
        return None

def init_kafka_consumer_instance(topic, group_id=None):
    try:
        if topic is None:
            raise Exception("Invalid argument topic")
        comsumer = None
        comsumer = KafkaConsumer(topic, bootstrap_servers=[KAFKA_BROKER_URL], auto_offset_reset="earliest",
           urn comsumer
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    return Noneurn comsumer
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    return None

Фрагмент кода производителя

# assuming obj is a model instance
        serialized_obj = serializers.serialize('json', [ order, ])
        #send_message(topic_name,order)
        producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER_URL], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
        x = producer.send("test", serialized_obj)
        producer.flush()

Теперь у меня есть несколько запросов, поэтому, если каким-то образом мое приложение (сервер) Django будет перезапущено, у меня все равно будет потребитель, слушающий эту тему.

Также у меня есть некоторые операторы печати для потребителя, которыми я являюсь.не удается увидеть в консоли моего сервера.

Тем не менее, при написании того же фрагмента кода (инициализация потребителя) в оболочке python, я могу видеть сообщения в выражениях print там, что означает, что мой Producer работает нормально.

1 Ответ

0 голосов
/ 08 апреля 2019

Kafka Server не зависит от вашего приложения Django (сервера). Но ваш Потребитель - это да.

Таким образом, ваша тема все еще жива на сервере Kafka (если сервер kafka умрет, это другая история), но ваш потребитель перезапустится с вашим приложением.

Так что, если вы хотите, чтобы ваш потребитель работал хорошо, сделайте его рабочим, который работает параллельно с вашим приложением и не будет перезапущен, когда ваше приложение не работает

...