aiokafka ProducerЗакрытая ошибка при генерации сообщений асинхронно - PullRequest
1 голос
/ 14 октября 2019

Я использую aiokafka для асинхронной генерации сообщений. У меня есть Api, использующий django, который производит сообщения в очередь kafka. Работало нормально. Теперь, когда я преобразовал тот же API-интерфейс для использования сервера aiohttp, появляется следующая ошибка: -

aiokafka.errors.ProducerClosed: ProducerClosed

Первое сообщение успешно создается,Вышеуказанная ошибка появляется при получении 2-го сообщения.

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(
    loop=loop,
    bootstrap_servers="127.0.0.1:9092"
)
await producer.start()
response = await producer.send_and_wait(queue_name, msg)
await producer.stop()

Нет информации об этой ошибке в документации aiokafka. Пожалуйста, помогите.

Редактировать: Я обманываю этого продюсера среди гонщиков. Если я оставлю продюсера открытым, это вызовет какие-то проблемы? Когда продюсер будет закрыт автоматически?

1 Ответ

1 голос
/ 15 октября 2019

aiokafka.errors.ProducerClosed: ProducerClosed

Эта ошибка возникает при отправке сообщения закрытому производителю.

Если вы предоставите доступпроизводитель среди обработчиков, убедитесь, что вы не закрываете его после выдачи первого сообщения.

Редактировать: вы можете закрыть его в контексте очистки

async def kafka(app):
    await producer.start()
    yield
    await producer.stop()


app.cleanup_ctx.append(kafka)

Без него все соединения будут пытаться закрыть

...