Dask Distributed: доступ к клиентскому фьючерсу из отдельного процесса - PullRequest
0 голосов
/ 19 апреля 2020

Я запустил много симуляций с Dask Distributed :

from time import sleep
from distributed import Client, as_completed

def simulation(x):
    """ Proxy function for simulation """
    sleep(60 * 60 * 24)  # wait one day
    return hash(x)

def save(result):
    with open("result", "w") as f:
        print(result, file=f)

if __name__ == "__main__":
    client = Client("localhost:8786")
    futures = client.map(simulation, range(1000))

    for future in as_completed(future):
        result = future.result()
        save(result)  

Однако в этом коде есть ошибка: open("result", "w") должно быть open(str(result), "w"). Я хотел бы исправить эту ошибку, повторную обработку клиентского фьючерса.

Однако я не знаю способа сделать это без остановки процесса Python с помощью прерывания клавиатуры, чем повторная обработка. отправка заданий в кластер Dask. Я не хочу этого делать, потому что это моделирование заняло пару дней.

Я хочу получить доступ ко всему будущему клиента и сохранить все существующие результаты. Как мне это сделать?

Возможные вопросы

1 Ответ

0 голосов
/ 19 апреля 2020

client.has_what - это метод, который вы ищете:

from distributed import Client, Future

if __name__ == "__main__":
    client = Client("localhost:8786")
    futures = [Future(key) for keys in client.has_what().values() for key in keys]

    for future in as_completed(futures):
        ...
...