У меня есть набор данных, для которого я хочу выполнить простую операцию группировки / подсчета, и я, кажется, не могу сделать это с помощью dask.
Скорее всего, я не понимаю, какГруппировка / уменьшение выполняется в режиме dask, особенно когда индекс находится в ключе группировки.Итак, я проиллюстрирую мою проблему с игрушечными данными.
Итак, сначала я создаю фрейм данных с 3 столбцами.
import pandas as pd
import numpy as np
np.random.seed(0)
df = pd.DataFrame(
{"A": np.random.randint(6, size=20),
"B": np.random.randint(6, size=20),
"C": np.random.rand(20)}
)
df = df.set_index("A")
Итак, у меня есть фрейм данных с индексом и 2 столбцами.В пандах я бы сделал:
result = df.groupby(["A", "B"]).C.count().reset_index().set_index("A")
В конце я захочу сохранить результат в паркетных файлах.
Теперь давайте перейдем к делу, я могу выполнять в основном те же операции:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=4)
result = ddf.groupby(["A", "B"]).C.count().reset_index().set_index("A")
Вызов compute приводит к тому же результату.Однако при проверке графика я нахожу:
График вычислений для GroupBy / Count
Мы видим, что все сводится к одному разделу.Я могу понять, что это более или менее необходимо, когда данные, которые нужно сгруппировать, распределяются по разным разделам или когда число фактических ключей группировки невелико.
Но в моем случае индекс находится в разделе, поэтому я хотел быожидать полностью параллельный график.Кроме того, количество ключей группировки имеет тот же порядок величины, что и количество начальных строк (деленное на коэффициент 2 или 3).
Как и тот, который получен с помощью этого кода:
result = ddf.map_partitions(
lambda x: x.groupby(
[x.index, x.B]
).C.count().reset_index().set_index("A")
)
, который дает следующий график: График параллельных вычислений
Так есть ли способ получить этот параллельный граф с обычными групповыми / уменьшающими функциями dask?
Это очень важно, когда количество группирующих ключей очень велико.