Редактировать: я обнаружил ошибку, я забыл фактически вернуть данные в моей функции агрегирования ... упс.Во всяком случае, есть несколько методов, перечисленных ниже, я оставлю их здесь для справки.
Оригинал: мне нужно применить довольно вычислительно дорогой групповой и агрегировать на фрейме данных панд.
df.groupby(['Col1','Col2'],as_index=False).aggregate(lambda x: list(x))
Я не могу использовать Map для этого (или, может быть, я больше не знаю), так как мне нужен доступ к нескольким строкам данных, чтобы сработала группа из двух столбцов.
Использование Pool.map просто возвращаеттот же самый df, как и ожидалось, потому что он передает данные строка за строкой, а не по группам или, по крайней мере, как целая часть.
То, что я хочу, просто:
- разбить Dataframe на n Slices / Chunks (я думаю, что np.array_split хорошо работает для этого),
- применить группирование и агрегирование для каждого слайса (просто нормальная операция для каждого слайса)
- затем объединить кусочки вместе (pd.concat)
это действительно не должно быть очень сложно, не так ли?Но каждый пример и учебник сосредоточены на построчных операциях, которые заставляют groupby / aggregate ничего не делать.
Спасибо за помощь:)
РЕДАКТИРОВАТЬ:
Iпопробовал:
from multiprocessing import Pool
num_partitions = 10 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine
def parallelize_dataframe(df, func):
df_split = np.array_split(df, num_partitions)
pool = Pool(num_cores)
df = pd.concat(pool.map(func, [group for name,group in df.groupby(['Col1','Col2'])]))
pool.close()
pool.join()
return df
def aggregate_fun(data):
data = data.groupby(['Col1','Col2'],as_index=False).aggregate(lambda x: list(x))
return data
EDIT2:
from concurrent.futures import ProcessPoolExecutor, as_completed
ppe = ProcessPoolExecutor(4)
futures = []
results = []
for group in np.split(df, 4):
p = ppe.submit(aggregate_fun, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
Оба метода работают, если вы не забудете, чтобы ваша функция действительно что-то возвращала: P Однако она не такая большаяускорение, как я надеюсь, обратите внимание, что я забыл выделить переменную данных перед возвратом или просто возвратить результат агрегации ... Показывает, что кодирование не должно выполняться поздно днем.