пользовательские графы dask с функциями, которым требуются аргументы ключевых слов, рассчитанные dask - PullRequest
0 голосов
/ 04 июля 2018

Как можно построить собственный граф dask, используя функцию, для которой требуются ключевые аргументы, являющиеся результатом другой задачи dask?

Документация по dask и несколько вопросов о стековом потоке предлагают использовать partial, toolz или dask.compatibility.apply. Все эти решения работают для статических аргументов ключевых слов. Насколько я понимаю из Включение аргументов ключевых слов (kwargs) в пользовательские графы Dask и некоторое чтение исходного кода и отладчика, что dask.compatibility.apply может работать с аргументами ключевых слов, которые являются результатом вычисления dask , Однако я не могу понять правильный синтаксис и не могу найти ответ в другом месте.

В приведенном ниже примере показано относительно простое применение dask.compatibility.apply со значением ключевого слова, вычисляемым с помощью dask. Dask успешно передает значения вычисленных аргументов 'a' и 'b', а также значение статического ключевого слова 'other'. Однако он передает строку 'c' в функцию, а не заменяет ее вычисленным значением.

import dask
from dask.compatibility import apply


def custom_func(a, b, other=None, c=None):
    print(a, b, other, c)
    return a * b / c / other


dsk = {
    'a': (sum, (1, 1)),
    'b': (sum, (2, 2)),
    'c': (sum, (3, 3)),
    'd': (apply, custom_func, ['a', 'b'], {'c': 'c', 'other': 2})
}

dask.visualize(dsk, filename='graph.png')
for key in sorted(dsk):
    print(key)
    print(dask.get(dsk, key))
    print('\n')

Вывод ниже:

a
2


b
4


c
6


d
2 4 2 c
Traceback (most recent call last):
  File "dask_kwarg.py", line 20, in <module>
    print(dask.get(dsk, key))
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 562, in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 529, in get_async
    fire_task()
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 504, in fire_task
    callback=queue.put)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 551, in apply_sync
    res = func(*args, **kwds)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 295, in execute_task
    result = pack_exception(e, dumps)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 290, in execute_task
    result = _execute_task(task, data)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 271, in _execute_task
    return func(*args2)
  File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/compatibility.py", line 50, in apply
    return func(*args, **kwargs)
  File "dask_kwarg.py", line 7, in custom_func
    return a * b / c / other
TypeError: unsupported operand type(s) for /: 'int' and 'str'

graph.png

1 Ответ

0 голосов
/ 10 июля 2018

Один из способов - узнать, как dask.delayed это делает:)

In [1]: import dask

In [2]: @dask.delayed
   ...: def f(*args, **kwargs):
   ...:     pass
   ...: 

In [3]: dict(f(x=1).dask)
Out[3]: 
{'f-d2cd50e7-25b1-49c5-b463-f05198b09dfb': (<function dask.compatibility.apply>,
  <function __main__.f>,
  [],
  (dict, [['x', 1]]))}

Интересно, что это также тот случай, когда локальный планировщик и распределенный планировщик не согласны. Распределенный планировщик обрабатывает это нормально.

In [1]: from dask.distributed import Client

In [2]: client = Client()

In [3]: import dask
   ...: from dask.compatibility import apply
   ...: 
   ...: 
   ...: def custom_func(a, b, other=None, c=None):
   ...:     print(a, b, other, c)
   ...:     return a * b / c / other
   ...: 
   ...: 
   ...: dsk = {
   ...:     'a': (sum, (1, 1)),
   ...:     'b': (sum, (2, 2)),
   ...:     'c': (sum, (3, 3)),
   ...:     'd': (apply, custom_func, ['a', 'b'], {'c': 'c', 'other': 2})
   ...: }
   ...: 

In [4]: for key in sorted(dsk):
   ...:     print(key, client.get(dsk, key))
   ...:     
a 2
b 4
c 6
2 4 2 6
d 0.6666666666666666
...