Dask применить с пользовательской функцией - PullRequest
0 голосов
/ 16 марта 2020

Я экспериментирую с Dask, но у меня возникла проблема при использовании apply после группировки.

У меня есть Dask DataFrame с большим количеством строк. Давайте рассмотрим, например, следующее

N=10000
df = pd.DataFrame({'col_1':np.random.random(N), 'col_2': np.random.random(N) })
ddf = dd.from_pandas(df, npartitions=8)

Я хочу добавить значения col_1, и я следую решению от здесь

bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1',bins,labels)

, где

def test_f(df,col,bins,labels):
    return df.assign(bin_num = pd.cut(df[col],bins,labels=labels))

и это работает так, как я ожидаю.

Теперь я хочу взять значение медианы в каждом бине (взято из здесь )

median = ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute()

Имея 10 ячеек, я ожидаю, что median будет иметь 10 строк, но на самом деле их 80. В кадре данных есть 8 разделов, поэтому я предполагаю, что приложение как-то работает с каждым из них по отдельности.

Однако, Если я хочу получить среднее значение и использовать mean

median = ddf2.groupby('bin_num')['col_1'].mean().compute()

, оно работает, и на выходе есть 10 строк.

Тогда возникает вопрос: что я делаю неправильно, что предотвращает apply работает как mean?

Ответы [ 2 ]

1 голос
/ 15 апреля 2020

Вы правы! Я смог воспроизвести вашу проблему на Dask 2.11.0. Хорошей новостью является то, что есть решение! Похоже, что проблема группового Dask связана именно с типом категории (pandas .core.dtypes.dtypes.CategoricalDtype). Приведя столбец категории к другому типу столбца (float, int, str), группа будет работать правильно.

Вот ваш код, который я скопировал:

import dask.dataframe as dd
import pandas as pd
import numpy as np


def test_f(df, col, bins, labels):
    return df.assign(bin_num=pd.cut(df[col], bins, labels=labels))

N = 10000
df = pd.DataFrame({'col_1': np.random.random(N), 'col_2': np.random.random(N)})
ddf = dd.from_pandas(df, npartitions=8)

bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1', bins, labels)

print(ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())

, который печатает Вы упомянули проблему

bin_num
0         NaN
1         NaN
2         NaN
3         NaN
4         NaN
       ...   
5    0.550844
6    0.651036
7    0.751220
8         NaN
9         NaN
Name: col_1, Length: 80, dtype: float64

Вот мое решение:

ddf3 = ddf2.copy()
ddf3["bin_num"] = ddf3["bin_num"].astype("int")

print(ddf3.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())

, которое напечатало:

bin_num
9    0.951369
2    0.249150
1    0.149563
0    0.049897
3    0.347906
8    0.847819
4    0.449029
5    0.550608
6    0.652778
7    0.749922
Name: col_1, dtype: float64

@ MRocklin или @TomAugspurger. Сможете ли вы создать исправление для этого в новом выпуске? Я думаю, что здесь достаточно воспроизводимого кода. Спасибо за ваш тяжелый труд. Я люблю Dask и использую его каждый день;)

1 голос
/ 17 марта 2020

Может быть, это предупреждение является ключом ( Dask do c: SeriesGroupBy.apply ):

groupby-apply Панд можно использовать для применения произвольных функций, включая агрегаты, которые приводят к одному ряду на группу. Groupby-apply Dask будет применять fun c один раз к каждой паре групп-групп , поэтому, когда fun c - это сокращение, вы получите одну строку на пару пар-групп. Чтобы применить пользовательскую агрегацию с Dask, используйте dask.dataframe.groupby.Aggregation.

...