Множественные пользовательские функции агрегации в кадре данных Dask - PullRequest
0 голосов
/ 03 сентября 2018

Я обрабатываю набор данных с помощью Dask (учитывая, что он не помещается в памяти), и я хочу сгруппировать экземпляры с помощью другой функции агрегирования в зависимости от столбца и его типа.

Dask имеет набор функций агрегирования по умолчанию для числовых типов данных, но не для строк / объектов. Есть ли способ реализовать пользовательскую функцию агрегирования для строк, несколько похожую на пример ниже?

atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': 'concatenate_fn1'  #string - No default fn for strings - Doesn't work
  'E': 'concatenate_fn2'  #string
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()

На данный момент я могу прочитать весь набор данных в памяти после удаления ненужных столбцов / строк, но я бы предпочел продолжить обработку в Dask, учитывая, что он быстрее выполняет требуемые операции.

Edit: Попытка добавления пользовательской функции прямо в словарь:

def custom_concat(df):
    ...
    return df_concatd

agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat(df)
}

-------------------------------------------------------
ValueError: unknown aggregate Dask DataFrame Structure:

1 Ответ

0 голосов
/ 03 сентября 2018

Реализованный Dask предоставляет Структура данных агрегации . Пользовательское агрегирование может быть выполнено следующим образом:

# Concatenates the strings and separates them using ","
custom_concat = dd.Aggregation('custom_sum', lambda x: ",".join(str(x)), lambda x0: ",".join(str(x0)))
custom_concat_E = ...

atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat_D
  'E': custom_concat_E
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()

Это также можно сделать с помощью Dataframe.apply для менее подробного решения

def agg_fn(x):
    return pd.Series(
        dict(
            C = x['C'].mean(), # int
            D = "{%s}" % ', '.join(x['D']), # string (concat strings)
            E = ...
        )
    )

ddf = ddf.groupby(atts_to_group).apply(agg_fn).compute().reset_index
...