Триггер Dask рабочие освободить память - PullRequest
0 голосов
/ 30 апреля 2019

Я распространяю вычисления некоторых функций, используя Dask. Мой общий макет выглядит так:


    from dask.distributed import Client, LocalCluster, as_completed

    cluster = LocalCluster(processes=config.use_dask_local_processes,
                           n_workers=1,
                           threads_per_worker=1,
                           )
    client = Client(cluster)
    cluster.scale(config.dask_local_worker_instances)

    work_futures = []

    # For each group do work
    for group in groups:
        fcast_futures.append(client.submit(_work, group))

    # Wait till the work is done
    for done_work in as_completed(fcast_futures, with_results=False):
        try:
            result = done_work.result()
        except Exception as error:
            log.exception(error)

Моя проблема в том, что для большого количества заданий я склонен выходить за пределы памяти. Я вижу много:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 1.15 GB -- Worker memory limit: 1.43 GB

Кажется, что каждое будущее не освобождает свою память. Как я могу вызвать это? Я использую dask == 1.2.0 на Python 2.7.

1 Ответ

0 голосов
/ 30 апреля 2019

Результаты помогают планировщику, если на него указывает будущее клиента.Память освобождается, когда (или вскоре после этого) python собирает мусор в прошлом.В вашем случае вы сохраняете все свои фьючерсы в списке на протяжении всего вычисления.Вы можете попробовать изменить цикл:

for done_work in as_completed(fcast_futures, with_results=False):
    try:
        result = done_work.result()
    except Exception as error:
        log.exception(error)    
    done_work.release()

или заменить цикл as_completed чем-то, что явно удаляет фьючерсы из списка после их обработки.

...