Dask HighLevelGraph вычисление короткого замыкания - PullRequest
1 голос
/ 30 мая 2019

Я пытаюсь взять DataFrame ddf и вернуть новый DataFrame, идентичный ddf, за исключением случаев, когда ddf имеет пустой раздел, он должен указывать на самый последний непустой компонент. Например, если ddf имеет разделы [P1, P2, P3, P4, P5, P6], где P2, P3 и P6 являются пустыми кадрами данных Pandas, то он возвращает следующий кадр данных Dask: [P1, P1, P1, P4, P5, P5]. Мой код

name = 'prev-nonempty-' + tokenize(ddf)
meta = ddf._meta
dsk = dict()
def helper(A, B):
  return B if A.empty else A
dsk[(name, 0)] = (helper, (ddf._name, 0), None)
for i in range(1, len(ddf.divisions)-1):
    dsk[(name, i)] = (helper, (ddf._name, i), (name, i-1))
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
return new_dd_object(graph, name, meta, ddf.divisions)

У меня вопрос: есть ли способ выполнять вычисления на коротких замыканиях в Dask HighLevelGraphs, чтобы вычисление i-го раздела останавливалось раньше, если он обнаружил непустой раздел.

Здесь говорится здесь , что

В таких случаях, как (add, 'x', 'y'), функции, такие как add, получают конкретные значения вместо ключей. Планировщик Dask заменяет ключи (например, x и y) их вычисленными значениями (например, 1 и 2) до , вызывая функцию add.

что говорит о том, что вы не можете замкнуть его, но, возможно, есть более сложные приемы планировщика Dask, которые я мог бы использовать?

1 Ответ

1 голос
/ 02 июня 2019

Нет, для стандартных графиков задач это сделать невозможно.Однако вы можете внедрить эту логику в саму свою функцию.

def func(accumulator, new_data):
    if is_done(accumulator):
        return accumulator 

Таким образом, вы по-прежнему выполняете все задачи, но они выполняются очень быстро после выполнения вашего условия.

Вы также можете рассмотреть возможность использования Dask Futures, но это немного более низкий уровень.https://docs.dask.org/en/latest/futures.html

...