Фильтр Dask DataFrame и повторное разбиение дают несколько пустых разделов - PullRequest
0 голосов
/ 08 мая 2020

Я пытаюсь отфильтровать Dask DataFrame, а затем использовать map_partitions, чтобы применить функцию к каждому разделу. Функция ожидает pandas DataFrame с как минимум 1 строкой.

Вот код для генерации некоторых фиктивных данных в виде pandas DataFrame (а затем преобразования в Dask DataFrame) для a MCVE

def create_data(n):
    df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
    random_integers = np.random.default_rng().choice(14, size=n, replace=False)
    df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
    return df

df = create_data(n=10)
print(df.head(15))
>>>
    store_id         A
0         10  0.850730
1         10  0.581119
2         10  0.825802
3         10  0.657797
4         10  0.291961
5         10  0.864984
6          9  0.161334
7          9  0.397162
8          9  0.089300
9          9  0.435914
10         9  0.750741
11         9  0.920625
12         3  0.635727
13         3  0.425270
14         3  0.904043

Структура данных: для каждой store_id ровно 6 строк.

Теперь я создаю список из некоторого количества store_id s, которые мне нужны чтобы использовать для фильтрации приведенных выше данных

filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]

Затем я конвертирую указанные выше данные (a pandas DataFrame) в dask.dataframe

ddf = dd.from_pandas(df, npartitions=10)

Теперь я распечатываю разделы из ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6

Это ожидается. В каждом разделе 6 строк и один (уникальный) store_id. Итак, каждый раздел содержит данные для одного store_id.

Теперь я фильтрую фрейм данных Dask, используя список store_id s сверху

ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]

Снова распечатываю разделы отфильтрованный ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6

Это ожидается, поскольку каждый раздел имеет один store_id и, в результате фильтрации, некоторые разделы будут полностью отфильтрованы, поэтому они будут содержать нулевые строки.

Итак, теперь я повторно разделю отфильтрованные Dataframe на передовые практики Dask DataFrame

ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
              store_id        A
npartitions=6                  
0                int64  float64
6                  ...      ...
...                ...      ...
48                 ...      ...
59                 ...      ...
Dask Name: repartition, 47 tasks

Я ожидал, что эта операция повторного разделения приведет к только одинаковому размеру не- пустые разделы. Но , теперь, когда я повторно распечатываю разделы, я получаю результат, аналогичный предыдущему (неравные размеры разделов и некоторые пустые разделы), как будто повторного разделения не произошло

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6

Мой следующий шаг - применить функцию к каждому разделу после фильтрации, но это не сработает, поскольку есть несколько разделов (pandas DataFrame s), которые функция не может обработать, поскольку в них отсутствуют строки.

def myadd(df):
    assert df.shape[0] > 0
    ...
    return ...

ddf.map_partitions(myadd)
>>> AssertionError                            Traceback (most recent call last)
.
.
.
AssertionError: 

Документация Dask для переразметки хорошо объяснена (то же самое для лучших практик, которые я привел выше), и это кажется достаточно простым, но после переразметки я все еще получить некоторые разделы с нулевыми строками и map_partitions здесь не удастся. Я уверен, что здесь чего-то не хватает.

Есть пара сообщений SO о переразметке ( 1 , 2 ), но они не касаются пустые разделы.

Вопрос

Есть ли способ гарантировать, что после повторного разбиения на все разделы снова будет 6 строк и не будет пустых разделов? то есть возможно ли иметь повторно разбитый Dask DataFrame с одинаковыми (непустыми) разделами?

EDIT

Похоже, что пустые разделы не могут быть разобрались в Даске, на данный момент: вопросы 1 , 2 . Это может быть связано с проблемой, с которой я столкнулся здесь.

1 Ответ

1 голос
/ 09 мая 2020

Я нашел два существующих сообщения из SO

  • удалить пустые разделы, используя cull_empty_partitions()
  • , чтобы получить равные размеры разделов, используя _rebalance_ddf()
    • предупреждение - эта функция требует вычисления

, и я использовал их для решения этой проблемы следующим образом.

Начните с исходного кода из вопросов (никаких изменений не требуется)

.
<identical code from question here>
.
ddf = ddf.repartition(npartitions=len(filtered_store_ids))

Затем я просто последовательно вызываю две функции в перераспределенном ddf

ddf = cull_empty_partitions(ddf)  # remove empties
ddf = _rebalance_ddf(ddf)         # re-size

Теперь, когда я повторно печатаю размеры разделов, все они имеют одинаковый размер и ни один не пустой

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
...