Как можно построить собственный граф dask, используя функцию, для которой требуются ключевые аргументы, являющиеся результатом другой задачи dask?
Документация по dask и несколько вопросов о стековом потоке предлагают использовать partial
, toolz
или dask.compatibility.apply
. Все эти решения работают для статических аргументов ключевых слов. Насколько я понимаю из Включение аргументов ключевых слов (kwargs) в пользовательские графы Dask и некоторое чтение исходного кода и отладчика, что dask.compatibility.apply
может работать с аргументами ключевых слов, которые являются результатом вычисления dask , Однако я не могу понять правильный синтаксис и не могу найти ответ в другом месте.
В приведенном ниже примере показано относительно простое применение dask.compatibility.apply
со значением ключевого слова, вычисляемым с помощью dask. Dask успешно передает значения вычисленных аргументов 'a'
и 'b'
, а также значение статического ключевого слова 'other'
. Однако он передает строку 'c'
в функцию, а не заменяет ее вычисленным значением.
import dask
from dask.compatibility import apply
def custom_func(a, b, other=None, c=None):
print(a, b, other, c)
return a * b / c / other
dsk = {
'a': (sum, (1, 1)),
'b': (sum, (2, 2)),
'c': (sum, (3, 3)),
'd': (apply, custom_func, ['a', 'b'], {'c': 'c', 'other': 2})
}
dask.visualize(dsk, filename='graph.png')
for key in sorted(dsk):
print(key)
print(dask.get(dsk, key))
print('\n')
Вывод ниже:
a
2
b
4
c
6
d
2 4 2 c
Traceback (most recent call last):
File "dask_kwarg.py", line 20, in <module>
print(dask.get(dsk, key))
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 562, in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 529, in get_async
fire_task()
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 504, in fire_task
callback=queue.put)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 551, in apply_sync
res = func(*args, **kwds)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 295, in execute_task
result = pack_exception(e, dumps)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 290, in execute_task
result = _execute_task(task, data)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 271, in _execute_task
return func(*args2)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/compatibility.py", line 50, in apply
return func(*args, **kwargs)
File "dask_kwarg.py", line 7, in custom_func
return a * b / c / other
TypeError: unsupported operand type(s) for /: 'int' and 'str'