Я распространяю вычисления некоторых функций, используя Dask. Мой общий макет выглядит так:
from dask.distributed import Client, LocalCluster, as_completed
cluster = LocalCluster(processes=config.use_dask_local_processes,
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)
work_futures = []
# For each group do work
for group in groups:
fcast_futures.append(client.submit(_work, group))
# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
Моя проблема в том, что для большого количества заданий я склонен выходить за пределы памяти. Я вижу много:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.15 GB -- Worker memory limit: 1.43 GB
Кажется, что каждое будущее не освобождает свою память. Как я могу вызвать это? Я использую dask == 1.2.0 на Python 2.7.