Я пытаюсь использовать groupby
и apply
пользовательскую функцию для огромного набора данных, которая дает мне ошибки памяти, а рабочие гибнут из-за перестановок. Как я могу избежать тасования и сделать это эффективно.
Я читаю около пятидесяти 700 МБ (каждого) файлов паркета, и данные в этих файлах изолированы, то есть ни одна группа не существует более чем в одном файле. Если я пытаюсь запустить свой код для одного файла, он работает нормально, но не получается, когда я пытаюсь запустить полный набор данных.
В документации Dask говорится о проблемах с groupby
, когда вы применяете пользовательскую функцию к группам, но они не предлагают решения для таких данных:
http://docs.dask.org/en/latest/dataframe-groupby.html#difficult-cases
Как я могу обработать свой набор данных в разумные сроки (для групповой подачи заявки на один файл требуется около 6 минут) и, надеюсь, избежать перемешивания. Мне не нужно сортировать результаты, или groupby
пытается отсортировать полный набор данных из разных файлов.
Я пытался использовать persist
, но данные не помещаются в ОЗУ (32 ГБ). Хотя dask не поддерживает многостолбцовый индекс, но я попытался добавить индекс для одного столбца для поддержки groupby, но безрезультатно. Ниже показано, как выглядит структура кода:
from dask.dataframe import read_parquet
df = read_parquet('s3://s3_directory_path')
results = df.groupby(['A', 'B']).apply(custom_function).compute()
# custom function sorts the data within a group (the groups are small, less than 50 entries) on a field and computes some values based on heuristics (it computes 4 values, but I am showing 1 in example below and other 3 calculations are similar)
def custom_function(group):
results = {}
sorted_group = group.sort_values(['C']).reset_index(drop=True)
sorted_group['delta'] = sorted_group['D'].diff()
sorted_group.delta = sorted_group.delta.shift(-1)
results['res1'] = (sorted_group[sorted_group.delta < -100]['D'].sum() - sorted_group.iloc[0]['D'])
# similarly 3 more results are generated
results_df = pd.DataFrame(results, index=[0])
return results_df
Одна возможность состоит в том, что я обрабатываю один файл за раз и делаю это несколько раз, но в этом случае dask кажется бесполезным (без параллельной обработки), и для достижения желаемых результатов потребуются часы. Есть ли способ сделать это эффективно, используя dask или любую другую библиотеку? Как люди поступают с такими данными?