Какой эффективный способ использовать groupby и применить пользовательскую функцию для огромного набора данных и избежать перемешивания? - PullRequest
0 голосов
/ 13 января 2019

Я пытаюсь использовать groupby и apply пользовательскую функцию для огромного набора данных, которая дает мне ошибки памяти, а рабочие гибнут из-за перестановок. Как я могу избежать тасования и сделать это эффективно.

Я читаю около пятидесяти 700 МБ (каждого) файлов паркета, и данные в этих файлах изолированы, то есть ни одна группа не существует более чем в одном файле. Если я пытаюсь запустить свой код для одного файла, он работает нормально, но не получается, когда я пытаюсь запустить полный набор данных.

В документации Dask говорится о проблемах с groupby, когда вы применяете пользовательскую функцию к группам, но они не предлагают решения для таких данных: http://docs.dask.org/en/latest/dataframe-groupby.html#difficult-cases

Как я могу обработать свой набор данных в разумные сроки (для групповой подачи заявки на один файл требуется около 6 минут) и, надеюсь, избежать перемешивания. Мне не нужно сортировать результаты, или groupby пытается отсортировать полный набор данных из разных файлов.

Я пытался использовать persist, но данные не помещаются в ОЗУ (32 ГБ). Хотя dask не поддерживает многостолбцовый индекс, но я попытался добавить индекс для одного столбца для поддержки groupby, но безрезультатно. Ниже показано, как выглядит структура кода:

from dask.dataframe import read_parquet

df = read_parquet('s3://s3_directory_path')
results = df.groupby(['A', 'B']).apply(custom_function).compute()

# custom function sorts the data within a group (the groups are small, less than 50 entries) on a field and computes some values based on heuristics (it computes 4 values, but I am showing 1 in example below and other 3 calculations are similar)

def custom_function(group):
    results = {}
    sorted_group = group.sort_values(['C']).reset_index(drop=True)
    sorted_group['delta'] = sorted_group['D'].diff()
    sorted_group.delta = sorted_group.delta.shift(-1)
    results['res1'] = (sorted_group[sorted_group.delta < -100]['D'].sum() - sorted_group.iloc[0]['D'])
    # similarly 3 more results are generated
    results_df = pd.DataFrame(results, index=[0])
    return results_df

Одна возможность состоит в том, что я обрабатываю один файл за раз и делаю это несколько раз, но в этом случае dask кажется бесполезным (без параллельной обработки), и для достижения желаемых результатов потребуются часы. Есть ли способ сделать это эффективно, используя dask или любую другую библиотеку? Как люди поступают с такими данными?

1 Ответ

0 голосов
/ 20 февраля 2019

Если вы хотите избежать перетасовки и можете пообещать, что группы хорошо изолированы, то вы можете просто позвонить панде groupby и подать заявку на каждый раздел с помощью map_partitions

df.map_partitions(lambda part: part.groupby(...).apply(...))
...