Как составлять задачи в dask-распределенных - PullRequest
0 голосов
/ 21 октября 2018

Я пытаюсь запустить параллельный цикл joblib внутри многопоточного распределенного кластера (см. Ниже причину), но не могу получить никакого ускорения из-за GIL-блокировки.Вот пример:

def task(x):
    """ Sample single-process task that takes between 2 and 5 seconds """
    import time
    import random
    dt = random.uniform(2,5)
    time.sleep(dt)
    return x+dt

def composite_task(np=8):
    """ Composite task that runs multiple single-process runs in parallel """
    from functools import partial
    from joblib import Parallel, delayed, parallel_backend
    with parallel_backend('loky', n_jobs=np):
        out=Parallel()(delayed(task)(i) for i in list(range(0, np)))
    return out

Задача с одним процессором в среднем занимает 3,5 секунды

%timeit -n7 -r1 task(0)
3.61 s ± 0 ns per loop (mean ± std. dev. of 1 run, 7 loops each)

Joblib работает, как и ожидалось, 8 задач занимают не больше, чем одна самая длинная

%timeit -n1 -r1 composite_task(8)
5.03 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Однако, когда я пытаюсь запустить этот код внутри dask LocalCluster с 8 потоками, я не получаю никакого ускорения

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=8)
client = Client(cluster)

%timeit -n1 -r1 client.submit(composite_task,8).result()
25.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Возможно, я неправильно понимаю, как работает GIL.Пожалуйста помоги.Полный блокнот можно посмотреть здесь:

http://nbviewer.jupyter.org/gist/aeantipov/6d670e13cd503741e9ef5b0299719a8e


Причина, по которой стоит попытаться это сделать, заключается в том, что необходимо решить> 10 тыс. Задач с заблокированным GIL примерно на 50 узлах с 32 процессорами.Кластер dask-jobqueue легко создать с 50 рабочими * 32 потоками, но не с 1600 рабочими.И, к сожалению, поскольку GIL заблокирован, использование этого примера http://matthewrocklin.com/blog/work/2018/06/26/dask-scaling-limits не дает значительного ускорения при работе 50 рабочих.


dask                      0.19.1                
dask-core                 0.19.1                
dask-jobqueue             0.3.0             
python                    3.7.0
distributed               1.23.1

1 Ответ

0 голосов
/ 22 октября 2018

Я бы просто использовал решение dask-joblib

cluster = LocalCluster()
client = Client(cluster)

with joblib.parallel_backend('dask'):
    out=Parallel()(delayed(task)(i) for i in range(0, np))

Ваши опасения по поводу GIL здесь не применимы.Ваша функция вызывает sleep, которая освобождает GIL во время выполнения.Если ваша настоящая функция - чистый код Python и не выпускает GIL, то я рекомендую запустить кластер Dask со многими однопоточными процессами.Если вы используете dask-jobqueue, то вы хотите использовать ключевое слово processes= для управления процессами для каждого задания.

У вас может быть гораздо больше задач, чем у процессов.

...