Я установил кластер Dask
и с радостью отправляю ему базовые c Prefect
потоки. Теперь я хочу сделать что-то более интересное и взять пользовательский образ docker с моей python библиотекой и выполнить потоки / задачи в кластере dask.
Я предполагал, что могу покинуть кластер dask ( scheduler и worker), как и в их собственной среде python (после проверки все различные библиотеки передачи сообщений имеют соответствующие версии везде). Другими словами, я не ожидаю, что мне понадобится добавлять свою библиотеку на эти машины, если Flow выполняется в моем пользовательском storage
. Однако либо я неправильно настроил хранилище, либо предполагать вышеуказанное небезопасно. Другими словами, возможно, при травлении объектов в моей пользовательской библиотеке кластер Dask действительно должен знать о моей python библиотеке. Предположим, у меня есть какая-то общая c python библиотека с именем data
...
import prefect
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker
#option 1
storage = Docker(registry_url="gcr.io/my-project/",
python_dependencies=["some-extra-public-package"],
dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()
#or option 2, specify image directly
storage = Docker(
registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
)
#storage.build()
def get_tasks():
return [
"gs://path/to/task.yaml"
]
@prefect.task
def run_task(uri):
#fails because this data needs to be pickled ??
from data.tasks import TaskBase
task = TaskBase.from_task_uri(uri)
#task.run()
return "done"
with prefect.Flow("dask-example",
storage = storage) as flow:
#chain stuff...
result = run_task.map(uri=get_tasks())
executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)
Может ли кто-нибудь объяснить, как / если этот тип рабочего процесса на основе docker должен работать?