kafka- python: закрытие производителя kafka с таймаутом 0 vs inf secs - PullRequest
1 голос
/ 08 мая 2020

Я пытаюсь создать сообщения для Kafka topi c, используя kafka- python 2.0.1, используя python 2.7 (не могу использовать Python 3 из-за некоторых ограничений, связанных с рабочим местом)

Я создал класс, как показано ниже, в отдельном пакете, скомпилировал его и установил в виртуальной среде:

import json
from kafka import KafkaProducer


class KafkaSender(object):
    def __init__(self):
        self.producer = self.get_kafka_producer()

    def get_kafka_producer(self):
        return KafkaProducer(
            bootstrap_servers=['locahost:9092'],
            value_serializer=lambda x: json.dumps(x),
            request_timeout_ms=2000,
        )

    def send(self, data):
        self.producer.send("topicname", value=data)

Мой код драйвера выглядит примерно так:

from mypackage import KafkaSender

# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)

Сценарий 1:
Я запускаю этот код, он работает нормально, без ошибок, но сообщение не отправляется в топ c. Я подтвердил это, так как смещение или лаг не увеличиваются в топи c. Кроме того, на стороне потребителя ничего не регистрируется.

Сценарий 2:
Прокомментирована / удалена инициализация производителя Kafka из метода __init__.
Я изменил отправку строка от self.producer.send("topicname", value=data) до self.get_kafka_producer().send("topicname", value=data), т.е. создание производителя kafka не заранее (во время инициализации класса), а прямо перед отправкой сообщения в topi c. И когда я запустил код, он работал отлично. Сообщение было опубликовано в топах c.

Я намерен использовать сценарий 1 - один раз создать производителя Kafka и использовать его несколько раз, а не создавать производителя Kafka каждый раз, когда я хочу отправлять сообщения. Таким образом, я могу создать миллионы объектов производителя Kafka, если мне нужно отправить миллионы сообщений.

Не могли бы вы помочь мне понять, почему производитель Kafka ведет себя таким образом.

ПРИМЕЧАНИЕ. Если я напишу код Kafka и код драйвера в одном файле, он будет работать нормально. Он не работает только тогда, когда я пишу код Kafka в отдельном пакете, компилирую его и импортирую в свой другой проект.

ЖУРНАЛЫ: https://www.diffchecker.com/dTtm3u2a

Обновление 1: 9 мая 2020 года, 17: 20:
Удалены журналы INFO из описания вопроса. Я включил уровень DEBUG, и вот разница между журналами отладки между первым и вторым сценариями

https://www.diffchecker.com/dTtm3u2a

Обновление 2: 9 мая 2020, 21: 28:
После дальнейшей отладки и просмотра исходного кода python -kafka я смог сделать вывод, что в сценарии 1 отправитель кафки был принудительно закрыт, в то время как в сценарии 2 отправитель кафки был закрыто изящно.

    def initiate_close(self):
        """Start closing the sender (won't complete until all data is sent)."""
        self._running = False
        self._accumulator.close()
        self.wakeup()

    def force_close(self):
        """Closes the sender without sending out any pending messages."""
        self._force_close = True
        self.initiate_close()

И это зависит от того, вызывается ли метод производителя kafka close() с таймаутом 0 (принудительное закрытие отправителя) или без тайм-аута (в этом случае таймаут принимает значение float('inf') и изящное закрытие отправителя.)

Метод close() производителя Kafka вызывается из метода __del__, который вызывается во время сборки мусора. Метод close(0) вызывается из метода, зарегистрированного с помощью atexit, который вызывается, когда интерпретатор завершает работу .
Вопрос: почему в сценарии 1 интерпретатор завершает работу?

...