Я пытаюсь создать сообщения для Kafka topi c, используя kafka- python 2.0.1, используя python 2.7 (не могу использовать Python 3 из-за некоторых ограничений, связанных с рабочим местом)
Я создал класс, как показано ниже, в отдельном пакете, скомпилировал его и установил в виртуальной среде:
import json
from kafka import KafkaProducer
class KafkaSender(object):
def __init__(self):
self.producer = self.get_kafka_producer()
def get_kafka_producer(self):
return KafkaProducer(
bootstrap_servers=['locahost:9092'],
value_serializer=lambda x: json.dumps(x),
request_timeout_ms=2000,
)
def send(self, data):
self.producer.send("topicname", value=data)
Мой код драйвера выглядит примерно так:
from mypackage import KafkaSender
# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Сценарий 1:
Я запускаю этот код, он работает нормально, без ошибок, но сообщение не отправляется в топ c. Я подтвердил это, так как смещение или лаг не увеличиваются в топи c. Кроме того, на стороне потребителя ничего не регистрируется.
Сценарий 2:
Прокомментирована / удалена инициализация производителя Kafka из метода __init__
.
Я изменил отправку строка от self.producer.send("topicname", value=data)
до self.get_kafka_producer().send("topicname", value=data)
, т.е. создание производителя kafka не заранее (во время инициализации класса), а прямо перед отправкой сообщения в topi c. И когда я запустил код, он работал отлично. Сообщение было опубликовано в топах c.
Я намерен использовать сценарий 1 - один раз создать производителя Kafka и использовать его несколько раз, а не создавать производителя Kafka каждый раз, когда я хочу отправлять сообщения. Таким образом, я могу создать миллионы объектов производителя Kafka, если мне нужно отправить миллионы сообщений.
Не могли бы вы помочь мне понять, почему производитель Kafka ведет себя таким образом.
ПРИМЕЧАНИЕ. Если я напишу код Kafka и код драйвера в одном файле, он будет работать нормально. Он не работает только тогда, когда я пишу код Kafka в отдельном пакете, компилирую его и импортирую в свой другой проект.
ЖУРНАЛЫ: https://www.diffchecker.com/dTtm3u2a
Обновление 1: 9 мая 2020 года, 17: 20:
Удалены журналы INFO из описания вопроса. Я включил уровень DEBUG, и вот разница между журналами отладки между первым и вторым сценариями
https://www.diffchecker.com/dTtm3u2a
Обновление 2: 9 мая 2020, 21: 28:
После дальнейшей отладки и просмотра исходного кода python -kafka я смог сделать вывод, что в сценарии 1 отправитель кафки был принудительно закрыт, в то время как в сценарии 2 отправитель кафки был закрыто изящно.
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
self._running = False
self._accumulator.close()
self.wakeup()
def force_close(self):
"""Closes the sender without sending out any pending messages."""
self._force_close = True
self.initiate_close()
И это зависит от того, вызывается ли метод производителя kafka close()
с таймаутом 0 (принудительное закрытие отправителя) или без тайм-аута (в этом случае таймаут принимает значение float('inf')
и изящное закрытие отправителя.)
Метод close()
производителя Kafka вызывается из метода __del__
, который вызывается во время сборки мусора. Метод close(0)
вызывается из метода, зарегистрированного с помощью atexit
, который вызывается, когда интерпретатор завершает работу .
Вопрос: почему в сценарии 1 интерпретатор завершает работу?