Я использую aiokafka для асинхронной генерации сообщений. У меня есть Api, использующий django, который производит сообщения в очередь kafka. Работало нормально. Теперь, когда я преобразовал тот же API-интерфейс для использования сервера aiohttp, появляется следующая ошибка: -
aiokafka.errors.ProducerClosed: ProducerClosed
Первое сообщение успешно создается,Вышеуказанная ошибка появляется при получении 2-го сообщения.
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(
loop=loop,
bootstrap_servers="127.0.0.1:9092"
)
await producer.start()
response = await producer.send_and_wait(queue_name, msg)
await producer.stop()
Нет информации об этой ошибке в документации aiokafka. Пожалуйста, помогите.
Редактировать: Я обманываю этого продюсера среди гонщиков. Если я оставлю продюсера открытым, это вызовет какие-то проблемы? Когда продюсер будет закрыт автоматически?