Как я могу объединить последовательное и параллельное выполнение отложенных вызовов функций? - PullRequest
0 голосов
/ 07 февраля 2019

Я застрял в странном месте.У меня есть куча отложенных вызовов функций, которые я хочу выполнить в определенном порядке.Хотя параллельное выполнение тривиально:

res = client.compute([myfuncs])
res = client.gather(res)

Кажется, я не могу найти способ выполнить их последовательно, неблокирующим образом.

Вот минимальный пример:

import numpy as np
from time import sleep
from datetime import datetime

from dask import delayed
from dask.distributed import LocalCluster, Client


@delayed
def dosomething(name):
    res = {"name": name, "beg": datetime.now()}
    sleep(np.random.randint(10))
    res.update(rand=np.random.rand())
    res.update(end=datetime.now())
    return res


seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]

Учитывая приведенный выше пример, я бы хотел запустить seq1, par1 и par2 параллельно, но составляющие seq1: "foo", "bar" и"baz", в последовательности.

1 Ответ

0 голосов
/ 07 февраля 2019

Вы можете определенно обмануть и добавить необязательную зависимость к вашей функции следующим образом:

@dask.delayed
def dosomething(name, *args):
     ...

Так что вы можете сделать задачи зависимыми друг от друга, даже если вы не используете один результат вследующий запуск функции:

inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
    seq1.append(dosomething(bit, seq1[-1]))

Кроме того, вы можете прочитать об интерфейсе «фьючерсов» распределенного планировщика, с помощью которого вы можете отслеживать ход выполнения задач в режиме реального времени.

...