Передел Dask DataFrame, чтобы получить даже разделы - PullRequest
0 голосов
/ 04 октября 2018

У меня есть Dask DataFrames , который содержит индекс, который не является уникальным (client_id).Перераспределение и сброс индекса заканчиваются очень неравномерными разделами - некоторые содержат только несколько строк, некоторые тысячи.Например, следующий код:

for p in range(ddd.npartitions):
    print(len(ddd.get_partition(p)))

распечатывает что-то подобное:

55
17
5
41
51
1144
4391
75153
138970
197105
409466
415925
486076
306377
543998
395974
530056
374293
237
12
104
52
28

Мой DataFrame имеет горячее кодирование и содержит более 500 столбцов.Большие разделы не помещаются в памяти.Я хотел перераспределить DataFrame, чтобы иметь разделы даже по размеру.Знаете ли вы эффективный способ сделать это?

РЕДАКТИРОВАТЬ 1

Простое воспроизведение:

df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
    print(len(new_ddf.get_partition(p)))

Обратите внимание на последние разделы (один элемент):

1000
1000
1000
1000
995
1
1
1
1
1

Даже когда мы раскомментируем закомментированные строки, разделы остаются неравномерными по размеру.

Редактировать II: Walkoround

Простой wlakoround может быть достигнут следующим кодом.Есть ли более изящный способ сделать это (больше в стиле Даск)?

def repartition(ddf, npartitions=None):
    MAX_PART_SIZE = 100*1024

    if npartitions is None:
        npartitions = ddf.npartitions

    one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
    length = len(ddf)

    requested_part_size = length/npartitions*one_row_size
    if requested_part_size <= MAX_PART_SIZE:
        np = npartitions
    else:
        np = length*one_row_size/MAX_PART_SIZE

    chunksize = int(length/np)


    vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()

    vsum = 0
    divisions = [ddf.divisions[0]]
    for i,v in vc.iterrows():
        vsum+=v['count']
        if vsum > chunksize:
            divisions.append(i)
            vsum = 0
    divisions.append(ddf.divisions[-1])


    return ddf.repartition(divisions=divisions, force=True)

1 Ответ

0 голосов
/ 07 мая 2019

Вы правы, что .repartition не сработает, так как не обрабатывает логику для вычислительных подразделений и просто пытается объединить существующие разделы, где это возможно.Вот решение, которое я придумал для той же проблемы:

def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)

Внутренняя функция sorted_division_locations уже делает то, что вы хотите, но она работает только с реальным списком, а не с ленивым dask.dataframe.Index,Это позволяет избежать извлечения полного индекса в случае, если имеется много дубликатов, и вместо этого просто получает отсчеты и восстанавливает их локально.

Если ваш фрейм данных настолько велик, что даже индекс не помещается в памяти, тогда вы бынужно сделать что-то еще более умное.

...