Я пытаюсь распараллелить вложенный цикл, используя дистрибутив dask, который выглядит следующим образом:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
Как видите, я использую планировщик distributed
. Сначала я создаю список computations
с ленивой функцией delayed_b
, которая принимает в качестве аргумента один элемент из list
. Затем delayed_b
создает новый набор computations
, который вызывает функцию delayed_a
, и все выполняется в распределенном режиме. Этот псевдокод работает, но я обнаружил, что он быстрее, если delayed_a
там нет. Тогда мой вопрос - как правильно распределить параллельные циклы?
В конце истории я пытаюсь сделать следующее:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
Буду очень признателен за любые предложения о наилучшем способе выполнения вложенных циклов с dask.distributed
.