Я новичок в Dask и пытаюсь построить систему для выполнения графа вычислений с зависимостями. Однако меня очень смущает тот факт, что некоторые задачи выполняются дважды, хотя у них есть подпись stati c. Например:
Python 3.7.5 (default, Nov 12 2019, 11:34:05)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client()
>>> def a():
... print("a")
...
>>> client.gather(client.submit(a))
a
>>> client.gather(client.submit(a))
a
>>> client.submit(a)
<Future: pending, key: a-a5eb50e9015acdf60b1094aa4e467e00>
a
>>> client.submit(a)
<Future: finished, type: builtins.NoneType, key: a-a5eb50e9015acdf60b1094aa4e467e00>
>>> client.gather(client.submit(a))
>>> client.gather(client.submit(a))
>>>
Таким образом, похоже, что a()
выполняется для каждого вызова с использованием client.gather(client.submit(a))
, но только до тех пор, пока я не вызову client.submit(a)
отдельно, после чего тот же Future
будет повторно использован и функция больше не вызывается. Это почему?
В моем графике вычислений это будет проблемой, когда две задачи полагаются на одну и ту же задачу, которая должна выполняться только один раз. Мой текущий подход к обработке таких зависимостей (рекурсивно) следующий:
from dask.distributed import Client, worker_client
def x(n):
dgraph = {
'a': [],
'b': ['a'],
'c': ['b', 'a'],
'd': ['b', 'c']
}
print(n)
with worker_client() as client:
client.gather(list(client.submit(x, d) for d in dgraph[n]))
if __name__ == '__main__':
client = Client()
result = client.submit(x, 'd')
client.gather(result)
Интересно, что результат python при выполнении этого скрипта нестабилен:
$ python test_dask2.py
d
b
c
a
b
a
a
$ python test_dask2.py
d
b
c
a
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-bff0c0d6e4239ae9c5beaed070018a1e'}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-59dc11a9fc2db8a0885e47d3e5891304'}
$
Есть ли способ гарантировать, что данная задача с заданными входными данными выполняется только один раз, даже если я отправлю ее несколько раз? Если я правильно понимаю документацию , это должно быть нормальным поведением. Если вызов print
является побочным эффектом, который предотвращает это, почему он не согласован и как я могу, например, предотвратить выполнение задачи, генерирующей выходной файл, дважды?
Кроме того, какая ошибка в конце, которая не всегда возникает?
EDIT:
Думаю, я понял, почему некоторые из моих задач выполнялись несколько раз во втором фрагменте: значение ha sh, которое submit
dask назначает задачам для идентификация иногда меняется даже при отправке одной и той же задачи несколько раз (и даже до того, как задача завершится и выйдет за рамки). Установка фиксированного значения параметра key
(например, имени задачи) в submit
устраняет эту проблему.