Мне нужно рассчитать статистику по сегментам больших (15 - 20 ГБ) файлов CSV.Это я делаю с groupby()
в Dask Dataframe.
Проблема в том, что мне нужны пользовательские функции, потому что мне нужны эксцесс и перекос, которые не являются частью Dask.Поэтому я использую groupby().apply()
.Тем не менее, это заставляет Dask использовать огромное количество дискового пространства в моем каталоге Temp: более 150 ГБ только один раз запускает скрипт!Это приводит к тому, что на моем жестком диске заканчивается свободное место, что приводит к сбою скрипта.
Есть ли способ переписать код, который позволяет избежать записи такого огромного количества мусора в мой каталог Temp?
Пример кода приведен ниже:
- Пример 1 выполняется относительно быстро и не генерирует тонны вывода Temp, но не поддерживает эксцесс или перекос.
- Пример2 вычисляет также эксцесс и перекос, но заполняет мой жесткий диск, если я запускаю его для полного набора данных.
Буду признателен за любую помощь!
Кстати: на этой странице (https://docs.dask.org/en/latest/dataframe-groupby.html), предлагается использовать индексированный столбец для groupby()
. Но, к сожалению, многоиндексированиеDask Dataframe не поддерживается, поэтому это не решает мою проблему.
import dask.dataframe as dd
import numpy as np
import scipy.stats as sps
ddf = dd.read_csv('18_GB_csv_file.csv')
segmentations = { 'seg1' : ['col1', 'col2'],
'seg2' : ['col1', 'col2', 'col3', 'col4'],
'seg3' : ['col3', 'col4'],
'seg4' : ['col1', 'col2', 'col5']
}
data_cols = [ 'datacol1', 'datacol2', 'datacol3' ]
# Example 1: this runs fast and doesn't generate needless temp output.
# But it does not support "kurt" or "skew":
dd_comp = {}
for seg_group, seg_cols in segmentations.items():
df_grouped = df.groupby(seg_cols)[data_cols]
dd_comp[seg_group] = df_grouped.aggregate( ['mean', 'std', 'min', 'max'])
with ProgressBar():
segmented_stats = dd.compute(dd_comp)
# Example 2: includes also "kurt" and "skew". But it is painfully slow
# and generates >150 GB of Temp output before running out of disk space
empty_segment = pd.DataFrame( index=data_cols,
columns=['mean', 'three_sigma',
'min', 'max', 'kurt', 'skew']
)
def segment_statistics(segment):
stats = empty_segment.copy()
for col in data_cols:
stats.loc[col]['mean'] = np.mean(segment[col])
stats.loc[col]['std'] = np.std(segment[col])
stats.loc[col]['min'] = np.min(segment[col])
stats.loc[col]['max'] = np.max(segment[col])
stats.loc[col]['skew'] = sps.skew(segment[col])
stats.loc[col]['kurt'] = sps.kurtosis(segment[col]) + 3
return stats
dd_comp = {}
for seg_group, seg_cols in segmentations.items():
df_grouped = df.groupby(seg_cols)[data_cols]
dd_comp[seg_group] = df_grouped.apply( segment_statistics,
meta=empty_segment )
with ProgressBar():
segmented_stats = dd.compute(dd_comp)