Как векторизовать процесс Dask Apply - PullRequest
1 голос
/ 18 марта 2019

Подобно pandas GroupBy to List post , мы пытаемся запустить этот процесс в dask.

Наше текущее решение реализует функцию dataframe.apply . Поскольку в нашем процессе это бутылочное горлышко, есть ли другие варианты?
Ниже приведен пример кода с использованием данных dask.datasets.timeseries.

import dask
import dask.dataframe as dd
import pandas as pd

def set_list_att2(x: dd.Series):
        return list(set([item for item in x.values]))

df = dask.datasets.timeseries()
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att2, 
                                           meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
                   for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.compute().to_frame(), how='left')        
df_edge_att.head()

dataframe result

Примечание в линии

df_edge_att = df_edge_att.join(ser.compute().to_frame(), how='left')  

мы добавили compute в противном случае пример кода вернул только 1 строку в окончательном фрейме данных.

1 Ответ

0 голосов
/ 23 апреля 2019

Я провел некоторый тест и определенно пытаюсь использовать dd.Aggregation, а не apply. смотрите ниже результаты:

%%timeit
df = dask.datasets.timeseries()
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att2, 
                                           meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
                   for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')        
df_edge_att.head()

Результаты:
5 мин 44 с ± 11,2 с на цикл (среднее ± стандартное отклонение из 7 циклов, по 1 циклу каждый)

Однако при работе с dd.Aggregation произошло значительное улучшение:

%%timeit
df = dask.datasets.timeseries()
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),
)
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')        
df_edge_att.head()

Результаты:
2 мин ± 1,13 с на цикл (среднее ± стандартное отклонение из 7 циклов, по 1 циклу каждый)

...