У меня есть pandas.DataFrame
объект с именем df
, и я хочу интерполировать пропущенные значения с помощью распараллеливания. Вот что я делаю:
def func(df):
return df.interpolate(method='linear', axis=1)
ddf = dd.from_pandas(df, npartitions=8)
res = ddf.map_partitions(func)
res2 = res.compute()
Итог,
print(res2)
0 None
1 None
2 None
3 None
4 None
5 None
6 None
7 None
dtype: object
и
type(res)
dask.dataframe.core.Series
Редактировать 1
Следуя совету @mdurant, я изменил функцию на
def func(df):
return df.interpolate(method='linear', axis=1, inplace=True)
и теперь результат ожидаемый.
Тем не менее, у меня все еще есть вопросы новичка относительно этого кода. Приведенные ниже тесты показывают, что непараллельная версия работает быстрее, чем параллельная.
Непараллельно:
%time df.interpolate(method='linear', axis=1, inplace=True)
Interpolating missing values.
CPU times: user 19.5 s, sys: 162 ms, total: 19.7 s
Wall time: 19.8 s
Параллельное:
res = ddf.map_partitions(func)
%time res2 = res.compute()
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
Interpolating missing values.Interpolating missing values.
CPU times: user 29.1 s, sys: 2.3 s, total: 31.4 s
Wall time: 26.5 s
res.visualize()
Эта интерполяция является построчной операцией (интерполяция в row=1
), поэтому любой фрагмент (поток) показывается без штрафных санкций (разделение происходит между индексами).