У меня есть Dask DataFrames , который содержит индекс, который не является уникальным (client_id
).Перераспределение и сброс индекса заканчиваются очень неравномерными разделами - некоторые содержат только несколько строк, некоторые тысячи.Например, следующий код:
for p in range(ddd.npartitions):
print(len(ddd.get_partition(p)))
распечатывает что-то подобное:
55
17
5
41
51
1144
4391
75153
138970
197105
409466
415925
486076
306377
543998
395974
530056
374293
237
12
104
52
28
Мой DataFrame имеет горячее кодирование и содержит более 500 столбцов.Большие разделы не помещаются в памяти.Я хотел перераспределить DataFrame, чтобы иметь разделы даже по размеру.Знаете ли вы эффективный способ сделать это?
РЕДАКТИРОВАТЬ 1
Простое воспроизведение:
df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
print(len(new_ddf.get_partition(p)))
Обратите внимание на последние разделы (один элемент):
1000
1000
1000
1000
995
1
1
1
1
1
Даже когда мы раскомментируем закомментированные строки, разделы остаются неравномерными по размеру.
Редактировать II: Walkoround
Простой wlakoround может быть достигнут следующим кодом.Есть ли более изящный способ сделать это (больше в стиле Даск)?
def repartition(ddf, npartitions=None):
MAX_PART_SIZE = 100*1024
if npartitions is None:
npartitions = ddf.npartitions
one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
length = len(ddf)
requested_part_size = length/npartitions*one_row_size
if requested_part_size <= MAX_PART_SIZE:
np = npartitions
else:
np = length*one_row_size/MAX_PART_SIZE
chunksize = int(length/np)
vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()
vsum = 0
divisions = [ddf.divisions[0]]
for i,v in vc.iterrows():
vsum+=v['count']
if vsum > chunksize:
divisions.append(i)
vsum = 0
divisions.append(ddf.divisions[-1])
return ddf.repartition(divisions=divisions, force=True)