Как устранить чрезмерное использование жесткого диска (>> 100 ГБ) Dask Dataframe при перетасовке данных - PullRequest
1 голос
/ 19 сентября 2019

Мне нужно рассчитать статистику по сегментам больших (15 - 20 ГБ) файлов CSV.Это я делаю с groupby() в Dask Dataframe.

Проблема в том, что мне нужны пользовательские функции, потому что мне нужны эксцесс и перекос, которые не являются частью Dask.Поэтому я использую groupby().apply().Тем не менее, это заставляет Dask использовать огромное количество дискового пространства в моем каталоге Temp: более 150 ГБ только один раз запускает скрипт!Это приводит к тому, что на моем жестком диске заканчивается свободное место, что приводит к сбою скрипта.

Есть ли способ переписать код, который позволяет избежать записи такого огромного количества мусора в мой каталог Temp?

Пример кода приведен ниже:

  • Пример 1 выполняется относительно быстро и не генерирует тонны вывода Temp, но не поддерживает эксцесс или перекос.
  • Пример2 вычисляет также эксцесс и перекос, но заполняет мой жесткий диск, если я запускаю его для полного набора данных.

Буду признателен за любую помощь!

Кстати: на этой странице (https://docs.dask.org/en/latest/dataframe-groupby.html), предлагается использовать индексированный столбец для groupby(). Но, к сожалению, многоиндексированиеDask Dataframe не поддерживается, поэтому это не решает мою проблему.

import dask.dataframe as dd
import numpy as np
import scipy.stats as sps

ddf = dd.read_csv('18_GB_csv_file.csv')

segmentations = { 'seg1' : ['col1', 'col2'],
                 'seg2' : ['col1', 'col2', 'col3', 'col4'],
                 'seg3' : ['col3', 'col4'],
                 'seg4' : ['col1', 'col2', 'col5']
               }
data_cols = [ 'datacol1', 'datacol2', 'datacol3' ]


# Example 1: this runs fast and doesn't generate needless temp output.
# But it does not support "kurt" or "skew":

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.aggregate( ['mean', 'std', 'min', 'max'])

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)



# Example 2: includes also "kurt" and "skew". But it is painfully slow 
# and generates >150 GB of Temp output before running out of disk space

empty_segment = pd.DataFrame( index=data_cols,
                             columns=['mean', 'three_sigma',
                                      'min', 'max', 'kurt', 'skew']
                           )
def segment_statistics(segment):
   stats = empty_segment.copy()
   for col in data_cols:
       stats.loc[col]['mean'] = np.mean(segment[col])
       stats.loc[col]['std'] = np.std(segment[col])
       stats.loc[col]['min'] = np.min(segment[col])
       stats.loc[col]['max'] = np.max(segment[col])
       stats.loc[col]['skew'] = sps.skew(segment[col])
       stats.loc[col]['kurt'] = sps.kurtosis(segment[col]) + 3
   return stats

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.apply( segment_statistics,
                                          meta=empty_segment )

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)

1 Ответ

0 голосов
/ 21 сентября 2019

Похоже, что вы могли бы извлечь выгоду из пользовательских агрегатов: https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate

Если вы можете придумать несколько хороших реализаций для моментов более высокого порядка, они также звучат так, как если бы они были хорошим вкладом в проект..

...