Простая параллельная обработка кусков панд Dataframe с групповой и агрегатной операциями - PullRequest
0 голосов
/ 27 февраля 2019

Редактировать: я обнаружил ошибку, я забыл фактически вернуть данные в моей функции агрегирования ... упс.Во всяком случае, есть несколько методов, перечисленных ниже, я оставлю их здесь для справки.

Оригинал: мне нужно применить довольно вычислительно дорогой групповой и агрегировать на фрейме данных панд.

df.groupby(['Col1','Col2'],as_index=False).aggregate(lambda x: list(x))

Я не могу использовать Map для этого (или, может быть, я больше не знаю), так как мне нужен доступ к нескольким строкам данных, чтобы сработала группа из двух столбцов.
Использование Pool.map просто возвращаеттот же самый df, как и ожидалось, потому что он передает данные строка за строкой, а не по группам или, по крайней мере, как целая часть.

То, что я хочу, просто:

  • разбить Dataframe на n Slices / Chunks (я думаю, что np.array_split хорошо работает для этого),
  • применить группирование и агрегирование для каждого слайса (просто нормальная операция для каждого слайса)
  • затем объединить кусочки вместе (pd.concat)

это действительно не должно быть очень сложно, не так ли?Но каждый пример и учебник сосредоточены на построчных операциях, которые заставляют groupby / aggregate ничего не делать.

Спасибо за помощь:)

РЕДАКТИРОВАТЬ:

Iпопробовал:

from multiprocessing import Pool
num_partitions = 10 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, [group for name,group in df.groupby(['Col1','Col2'])]))
    pool.close()
    pool.join()
    return df
def aggregate_fun(data):
    data = data.groupby(['Col1','Col2'],as_index=False).aggregate(lambda x: list(x))
    return data

EDIT2:

from concurrent.futures import ProcessPoolExecutor, as_completed
ppe = ProcessPoolExecutor(4)

futures = []
results = []
for group in np.split(df, 4):
    p = ppe.submit(aggregate_fun, group)
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results) 

Оба метода работают, если вы не забудете, чтобы ваша функция действительно что-то возвращала: P Однако она не такая большаяускорение, как я надеюсь, обратите внимание, что я забыл выделить переменную данных перед возвратом или просто возвратить результат агрегации ... Показывает, что кодирование не должно выполняться поздно днем.

...