У меня есть простой рабочий процесс DASK.когда я его распечатываю, это выглядит так:
workflow = {
'a_task': (<function a_func at 0x7f1dc5ded598>,),
'b_task': (<function b_func at 0x7f1dc5ded620>,),
'c_task': (<function c_func at 0x7f1ddb07aea0>,),
'd_task': (<function d_func at 0x7f1dc5ded6a8>,),
'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>,
'a_task', 'b_task', 'c_task', 'd_task')
}
Оказывается, b_func
- это небольшая функция с циклом for, которая выполняет около 1000 итераций и занимает около часа.в основном это выглядит так:
def b_func(args...):
data = []
for this in that:
data.append(...)
return data
Это цикл for не обязательно должен быть выполнен по порядку.Это можно сделать параллельно.
Итак, вопрос: как мне справиться с этим?Должен ли я преобразовать этот цикл for в рабочий процесс и сделать еще один вызов dask внутри b_func
?или я должен вытащить этот процесс и расширить исходный рабочий процесс?
По сути, могу ли я вкладывать рабочие процессы dask или это плохая идея?
Кроме того, вы должны знать, что я использую from dask.distributed import Client
и Client.get
для распределения рабочего процесса по всему кластеру компьютеров.Я не знаю, усложняет ли это что-то за пределами dask.threaded.get
, но, возможно, это что-то меняет.Я предполагаю, что это означает, что одному из сотрудников workers
потребуется настроить новый планировщик и рабочих на всех компьютерах кластера, а затем передать им свой рабочий процесс.Может быть, idk.
Кто-нибудь имел дело с этой проблемой раньше?