Передача Dask недоступна во время вычислений - PullRequest
1 голос
/ 10 мая 2019

Я экспериментирую с Dask и хочу отправить поиск pandas.DataFrame на все рабочие узлы. К сожалению, это не с:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

Когда вместо lookup['baz'].iloc[2] используется lookup.result()['foo'].iloc[2], он работает нормально, но: для больших экземпляров входного фрейма данных он снова и снова застревает на from_pandas. Кроме того, кажется странным, что будущее необходимо блокировать вручную (снова и снова для каждой строки в операции применения. Есть ли способ блокировать будущее только один раз для каждого рабочего узла? Наивным улучшением может быть использование map_partitions, но это было бы возможно только в том случае, если число разделов довольно мало.

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

Фактически, эта наивная реализация dask кажется медленнее, чем обычные панды, для более крупных проблемных случаев. Я подозреваю, что низкая производительность выполнения связана с проблемой, поднятой выше.

1 Ответ

1 голос
/ 11 мая 2019

Вместо этого:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

Попробуйте вместо этого:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

Ранее вы скрывали будущее внутри лямбда-функции. Даск не смог найти его, чтобы превратить в правильное значение. Вместо этого, когда мы передаем будущее в качестве правильного аргумента, Dask может определить его таким, какой он есть, и правильно дать вам значение.

...