Я использую kafka-python в проекте django. Инициализировал производителя в файле settings.py. Вызывая02.send не отправляет сообщения.
Раньше мой продюсер каждый раз инициализировался в функции, и для отправки сообщений я использовал vendor.send (). Get (timeout = 1), и он работал нормально. Теперь я изменил инициализацию на файл настроек и удалил .get()
во время вызова send
, и он не работает. Старый рабочий код ниже:
В module_x.py
:
from kafka import KafkaProducer
def my_func():
KAFKA_CONNECTION = KafkaProducer(bootstrap_servers=CONFIGS.KAFKA_URL)
producer = KAFKA_CONNECTION
producer.send(topic, key=key, value=json.dumps(data)).get(timeout=1)
Изменен код ниже: теперь изменена инициализация на файл настроек, чтобы избежать инициализации при каждом вызове my_func
.
В settings.py
:
KAFKA_CONNECTION = KafkaProducer(bootstrap_servers=CONFIGS.KAFKA_URL)
В module_x.py
:
from django.conf import settings
def my_func():
producer = settings.KAFKA_CONNECTION
producer.send(topic, key=key, value=json.dumps(data))
Обратите внимание, что я даже пробовал с .get(timeout=1)
на send
, но получал Kafka.TiemoutError
Нужно ли использовать producer.flush()
после producer.send
или использовать linger_ms
во время инициализации.