Пишите Кафке от работника Dask - PullRequest
2 голосов
/ 19 марта 2020

Я пытаюсь понять, как я могу написать Кафке из Даск. Мы привязаны к использованию библиотеки Confluent. Dask пытается заставить производителя отправить его работнику, но из-за ленивого создания экземпляра происходит сбой с ошибкой. Мне удалось придумать следующее решение, но это похоже на взлом.

def get_producer(config):
    w = get_worker()
    if hasattr(w, 'producer'):
        return w.producer

    import confluent_kafka as ck
    w.producer = getattr(ck, 'Producer')(config)
    return w.producer

Обходной путь hasattr создаст производителя на работнике dask при отправке. Этот код также связывает продюсера с рабочим, чтобы поддерживать соединение для потоковой передачи.

Я надеюсь, что у кого-то, кто является скорее экспертом Dask, чем у меня, есть несколько полезных советов.

1 Ответ

1 голос
/ 28 марта 2020

То, что вы делаете, немного хакерское, да, но это тоже хорошо.

Вы можете также подумать о воссоздании производителя в каждой задаче, записи данных и закрытии этого производителя. , Если создание производителя не займет слишком много времени относительно количества времени, которое требуется для записи раздела данных, тогда это может быть достойным решением. Он немного менее эффективен, но, вероятно, более надежен / безопасен / зрел.

...