Я пытаюсь понять, как я могу написать Кафке из Даск. Мы привязаны к использованию библиотеки Confluent. Dask пытается заставить производителя отправить его работнику, но из-за ленивого создания экземпляра происходит сбой с ошибкой. Мне удалось придумать следующее решение, но это похоже на взлом.
def get_producer(config):
w = get_worker()
if hasattr(w, 'producer'):
return w.producer
import confluent_kafka as ck
w.producer = getattr(ck, 'Producer')(config)
return w.producer
Обходной путь hasattr
создаст производителя на работнике dask при отправке. Этот код также связывает продюсера с рабочим, чтобы поддерживать соединение для потоковой передачи.
Я надеюсь, что у кого-то, кто является скорее экспертом Dask, чем у меня, есть несколько полезных советов.