Как найти входные аргументы concurrent.future для вызова распределенной функции Dask - PullRequest
0 голосов
/ 15 февраля 2019

Я использую Dask для распределения работы по кластеру.Я создаю кластер и вызываю .submit() для отправки функции в планировщик.Возвращает объект Futures.Я пытаюсь выяснить, как получить входные аргументы для этого будущего объекта после его завершения.

Например:

from dask.distributed import Client
from dask_yarn import YarnCluster

def somefunc(a,b,c ..., n ):
    # do something
    return


cluster = YarnCluster.from_specification(spec)
client = Client(cluster)

future = client.submit(somefunc, arg1, arg2, ..., argn)

# ^^^ how do I obtain the input arguments for this future object?
# `future.args` doesn't work

Ответы [ 2 ]

0 голосов
/ 20 февраля 2019

Фьючерсы не держатся за свои входы.Вы можете сделать это самостоятельно, хотя.

futures = {}
future = client.submit(func, *args)

futures[future] = args
0 голосов
/ 16 февраля 2019

Будущее знает только ключ, по которому оно однозначно известно в планировщике.Во время представления, если у него есть зависимости, они временно обнаруживаются и отправляются в планировщик, но не сохраняются, если хранятся локально.

Паттерн, который вам нужен, звучит больше как delayed, который сохраняет егоgraph, и, действительно, client.compute(delayed_thing) возвращает будущее.

d = delayed(somefunc)(a, b, c)
future = client.compute(d)
dict(d.dask)  # graph of things needed by d

Вы можете напрямую связаться с планировщиком, чтобы найти зависимости некоторого ключа, который, как правило, также будет ключом, и поэтому провести обратный инжиниринг графа, но это не похоже на отличный путь, поэтому я не буду пытаться описать его здесь.

...