Распараллеливание задач в памяти в dask с использованием общей памяти (без отправки в другие процессы)? - PullRequest
0 голосов
/ 04 июля 2018

У меня есть тривиально распараллеливаемая проблема в памяти, но та, которая не дает больших ускорений с обычной многопроцессорной обработкой Python (только в 2 раза) из-за необходимости пересылки большого количества данных назад и вперед между процессами. Надеюсь, что может помочь.

Мой код в основном выглядит так:

delayed_results = []
for key, kdf in natsorted(scdf.groupby(grpby_key)):
    d1 = dd.from_pandas(kdf, npartitions=1)
    d2 = dd.from_pandas(other_dfs[key], npartitions=1)

    result = dask.delayed(function)(d1, d2, key=key, n_jobs=n_jobs, **kwargs)
    delayed_results.append(result)

outdfs = dask.compute(*delayed_results)

Вот как выглядел мой старый код Joblib:

outdfs = Parallel(n_jobs=n_jobs)(delayed(function)(scdf, other_dfs[key], key=key, n_jobs=n_jobs, **kwargs) for key, scdf in natsorted(scdf.groupby(grpby_key)))

Однако код dask намного медленнее и потребляет больше памяти, как для многопоточных, так и для многопроцессорных планировщиков. Я надеялся, что dask можно будет использовать для распараллеливания задач без необходимости отправлять данные другим процессам. Есть ли способ использовать несколько процессов с dask, используя общую память?


Btw. Документы имеют ссылку на http://distributed.readthedocs.io/en/latest/local-cluster.html, где они объясняют, что этот планировщик

Он обрабатывает данные с большей сложностью, и поэтому может быть больше эффективнее, чем многопроцессорный планировщик на рабочих нагрузках, которые требуют несколько процессов.

Но у них нет примеров его использования. Чем я должен заменить мой dask.compute() вызов в коде выше, чтобы попробовать локальный кластер?

1 Ответ

0 голосов
/ 06 июля 2018

Так что вы можете просто сделать следующее

from distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=4)
client = Client(cluster)

<your code>

Распределенный будет по умолчанию регистрироваться как исполнитель, и вы можете просто использовать dask.compute как обычно

...