Вопрос относительно map_partitions в объекте dask.dataframe - PullRequest
0 голосов
/ 29 октября 2018

У меня есть pandas.DataFrame объект с именем df, и я хочу интерполировать пропущенные значения с помощью распараллеливания. Вот что я делаю:

def func(df):
    return df.interpolate(method='linear', axis=1)


ddf = dd.from_pandas(df, npartitions=8)
res = ddf.map_partitions(func)
res2 = res.compute()

Итог,

print(res2)
0    None
1    None
2    None
3    None
4    None
5    None
6    None
7    None
dtype: object

и

type(res)
dask.dataframe.core.Series

Редактировать 1 Следуя совету @mdurant, я изменил функцию на

def func(df):
    return df.interpolate(method='linear', axis=1, inplace=True) 

и теперь результат ожидаемый.

Тем не менее, у меня все еще есть вопросы новичка относительно этого кода. Приведенные ниже тесты показывают, что непараллельная версия работает быстрее, чем параллельная.

Непараллельно:

%time df.interpolate(method='linear', axis=1, inplace=True)
Interpolating missing values.
CPU times: user 19.5 s, sys: 162 ms, total: 19.7 s
Wall time: 19.8 s

Параллельное:

res = ddf.map_partitions(func)
%time res2 = res.compute()
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
CPU times: user 29.1 s, sys: 2.3 s, total: 31.4 s
Wall time: 26.5 s

res.visualize()

enter image description here

Эта интерполяция является построчной операцией (интерполяция в row=1), поэтому любой фрагмент (поток) показывается без штрафных санкций (разделение происходит между индексами).

1 Ответ

0 голосов
/ 29 октября 2018

Проблема здесь inplace=True - при этом вызов interpolate ничего не возвращает, поэтому вывод func() - None, и вы получаете результаты, которые видите. Как правило, функции Dask должны возвращать обработанные данные, а не пытаться изменить данные на месте. Просто удалите ключевое слово, и все, вероятно, будет работать.

...