Я читаю в 64 сжатых CSV-файлах (вероятно, 70-80 ГБ) в один фрейм данных dask, затем выполняю групповую обработку с агрегациями.
Задание никогда не завершается, потому что вначале groupby создает фрейм данных только с однимраздел.
В этом посте и в этом посте эта проблема уже решена, но сосредоточена на вычислительном графе, а не на проблеме памяти, с которой вы столкнулись, когда ваш результирующий фрейм данныхслишком большой.
Я попытался обойти обход с повторным распределением, но работа все еще не завершена.
Что я делаю не так, придется ли мне использовать map_partition?Это очень запутанно, так как я ожидаю, что Dask позаботится о разбиении всего, даже после операций агрегирования.
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
client
dask.config.set(scheduler='processes')
dB3 = dd.read_csv("boden/expansion*.csv", # read in parallel
blocksize=None, # 64 files
sep=',',
compression='gzip'
)
aggs = {
'boden': ['count','min']
}
dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64)
dBSelect=dBSelect.reset_index()
dBSelect.columns=['lng','lat','bodenCount','boden']
dBSelect=dBSelect.drop('bodenCount',axis=1)
with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)