Как использовать индивидуальное хранилище docker в потоках префектов? - PullRequest
2 голосов
/ 21 июня 2020

Я установил кластер Dask и с радостью отправляю ему базовые c Prefect потоки. Теперь я хочу сделать что-то более интересное и взять пользовательский образ docker с моей python библиотекой и выполнить потоки / задачи в кластере dask.

Я предполагал, что могу покинуть кластер dask ( scheduler и worker), как и в их собственной среде python (после проверки все различные библиотеки передачи сообщений имеют соответствующие версии везде). Другими словами, я не ожидаю, что мне понадобится добавлять свою библиотеку на эти машины, если Flow выполняется в моем пользовательском storage. Однако либо я неправильно настроил хранилище, либо предполагать вышеуказанное небезопасно. Другими словами, возможно, при травлении объектов в моей пользовательской библиотеке кластер Dask действительно должен знать о моей python библиотеке. Предположим, у меня есть какая-то общая c python библиотека с именем data ...

import prefect    
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker

#option 1
storage = Docker(registry_url="gcr.io/my-project/",
                 python_dependencies=["some-extra-public-package"],
                 dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()

#or option 2, specify image directly
storage = Docker(
        registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
    )

#storage.build()

def get_tasks():
    return [
        "gs://path/to/task.yaml"
           ]

@prefect.task
def run_task(uri):
    #fails because this data needs to be pickled ??
    from data.tasks import TaskBase
    task =  TaskBase.from_task_uri(uri)
    #task.run()
    return "done"

with prefect.Flow("dask-example",
                 storage = storage) as flow:
    #chain stuff...
    result =  run_task.map(uri=get_tasks())

executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)

Может ли кто-нибудь объяснить, как / если этот тип рабочего процесса на основе docker должен работать?

1 Ответ

3 голосов
/ 22 июня 2020

Вашим работникам dask потребуется доступ к тем же библиотекам python, которые используются в ваших задачах. Самый простой способ добиться этого - запустить ваши рабочие процессы, используя тот же образ, что и ваш Flow. Вы можете сделать это вручную или использовать что-то вроде DaskCloudProviderEnvironment , которое автоматически создаст недолговечные кластеры Dask для каждого потока, используя тот же образ.

...