почему dask не распараллеливает этот рабочий процесс? - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть 2 очень простые функции:

import time

def sleepy(a=1):
    time.sleep(a)
    print(a)

def ending(*args):
    print(args)
    print('finished')

У меня также есть рабочий процесс dask, который использует следующие функции:

workflow = {'task_0': (sleepy, 1), 
            'task_1': (sleepy, 2), 
            'task_2': (sleepy, 3), 
            'ending': (ending, 'task_0', 'task_1', 'task_2')}

Этот рабочий процесс можно визуализировать так:

dask.visualize(workflow)

sleepy, sleepy, sleepy должны выполняться параллельно, но это не так.

Я жду 1 секунду, и она печатает 1 с sleepy(), затем я жду 2 секунды, и она печатает 2, затем я жду еще 3 секунды, и она печатает 3:

1
2
3
(None, None, None)
finished

Что я делаю не так?

Ответы [ 2 ]

0 голосов
/ 13 сентября 2018

Изменение dask.get( на dask.threaded.get( устранило мою проблему, но мне также очень понравился ответ mdurant.

0 голосов
/ 13 сентября 2018

Так я бы кодировал ваш рабочий процесс, и операции сна действительно выполняются параллельно

import dask.delayed
import time

@dask.delayed
def sleepy(a=1):
    time.sleep(a)
    print(a)

@dask.delayed
def ending(*args):
    print(args)
    print('finished')

d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()

Обратите внимание, что декоратор @ - это только синтаксическая корректность, вы также можете сделать dask.delayed(sleepy),и т.д.

...