У меня есть тривиально распараллеливаемая проблема в памяти, но та, которая не дает больших ускорений с обычной многопроцессорной обработкой Python (только в 2 раза) из-за необходимости пересылки большого количества данных назад и вперед между процессами. Надеюсь, что может помочь.
Мой код в основном выглядит так:
delayed_results = []
for key, kdf in natsorted(scdf.groupby(grpby_key)):
d1 = dd.from_pandas(kdf, npartitions=1)
d2 = dd.from_pandas(other_dfs[key], npartitions=1)
result = dask.delayed(function)(d1, d2, key=key, n_jobs=n_jobs, **kwargs)
delayed_results.append(result)
outdfs = dask.compute(*delayed_results)
Вот как выглядел мой старый код Joblib:
outdfs = Parallel(n_jobs=n_jobs)(delayed(function)(scdf, other_dfs[key], key=key, n_jobs=n_jobs, **kwargs) for key, scdf in natsorted(scdf.groupby(grpby_key)))
Однако код dask намного медленнее и потребляет больше памяти, как для многопоточных, так и для многопроцессорных планировщиков. Я надеялся, что dask можно будет использовать для распараллеливания задач без необходимости отправлять данные другим процессам. Есть ли способ использовать несколько процессов с dask, используя общую память?
Btw. Документы имеют ссылку на http://distributed.readthedocs.io/en/latest/local-cluster.html, где они объясняют, что этот планировщик
Он обрабатывает данные с большей сложностью, и поэтому может быть больше
эффективнее, чем многопроцессорный планировщик на рабочих нагрузках, которые требуют
несколько процессов.
Но у них нет примеров его использования. Чем я должен заменить мой dask.compute()
вызов в коде выше, чтобы попробовать локальный кластер?