Как Dask решает, запускать задачу повторно или нет - PullRequest
0 голосов
/ 29 мая 2020

Я новичок в Dask и пытаюсь построить систему для выполнения графа вычислений с зависимостями. Однако меня очень смущает тот факт, что некоторые задачи выполняются дважды, хотя у них есть подпись stati c. Например:

Python 3.7.5 (default, Nov 12 2019, 11:34:05)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client()
>>> def a():
...   print("a")
...
>>> client.gather(client.submit(a))
a
>>> client.gather(client.submit(a))
a
>>> client.submit(a)
<Future: pending, key: a-a5eb50e9015acdf60b1094aa4e467e00>
a
>>> client.submit(a)
<Future: finished, type: builtins.NoneType, key: a-a5eb50e9015acdf60b1094aa4e467e00>
>>> client.gather(client.submit(a))
>>> client.gather(client.submit(a))
>>>

Таким образом, похоже, что a() выполняется для каждого вызова с использованием client.gather(client.submit(a)), но только до тех пор, пока я не вызову client.submit(a) отдельно, после чего тот же Future будет повторно использован и функция больше не вызывается. Это почему?

В моем графике вычислений это будет проблемой, когда две задачи полагаются на одну и ту же задачу, которая должна выполняться только один раз. Мой текущий подход к обработке таких зависимостей (рекурсивно) следующий:

from dask.distributed import Client, worker_client

def x(n):
    dgraph = {
        'a': [],
        'b': ['a'],
        'c': ['b', 'a'],
        'd': ['b', 'c']
        }
    print(n)
    with worker_client() as client:
        client.gather(list(client.submit(x, d) for d in dgraph[n]))

if __name__ == '__main__':
    client = Client()
    result = client.submit(x, 'd')
    client.gather(result)

Интересно, что результат python при выполнении этого скрипта нестабилен:

$ python test_dask2.py
d
b
c
a
b
a
a
$ python test_dask2.py
d
b
c
a
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-bff0c0d6e4239ae9c5beaed070018a1e'}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-59dc11a9fc2db8a0885e47d3e5891304'}
$

Есть ли способ гарантировать, что данная задача с заданными входными данными выполняется только один раз, даже если я отправлю ее несколько раз? Если я правильно понимаю документацию , это должно быть нормальным поведением. Если вызов print является побочным эффектом, который предотвращает это, почему он не согласован и как я могу, например, предотвратить выполнение задачи, генерирующей выходной файл, дважды?

Кроме того, какая ошибка в конце, которая не всегда возникает?

EDIT:

Думаю, я понял, почему некоторые из моих задач выполнялись несколько раз во втором фрагменте: значение ha sh, которое submit dask назначает задачам для идентификация иногда меняется даже при отправке одной и той же задачи несколько раз (и даже до того, как задача завершится и выйдет за рамки). Установка фиксированного значения параметра key (например, имени задачи) в submit устраняет эту проблему.

Ответы [ 2 ]

2 голосов
/ 29 мая 2020

Короткий ответ: dask сохраняет результат в памяти, пока он кому-то нужен. В этих случаях «потребность» может быть либо будущим в вашем сеансе, либо другой задачей, которая зависит от результата.

В строке типа client.gather(client.submit(a)) будущее, сгенерированное submit, забывается сразу после него. собрано. В строке типа client.submit(a) сгенерированное будущее сохраняется в переменной «последний результат» _ сеанса, и поэтому остается в памяти, и кластер не очищает его.

Если вам нужен больший контроль , вы можете назначить эти переменные:

fut = client.submit(a)  # sets func running, keeps hold of the future
fut2 = client.submit(a)  # uses already existing task to get result
client.gather(fut), fut.result() # get results
del fut2, fut  #  "forget" futures, and have cluster release them

Не забудьте использовать панель управления, чтобы увидеть текущее состояние кластера.

0 голосов
/ 13 июня 2020

Думаю, вы ищете следующую страницу документации https://distributed.dask.org/en/latest/memory.html

...