Как заставить параллельную обработку работать в Python? - PullRequest
0 голосов
/ 20 октября 2019

Я пытаюсь сделать параллельную обработку в Python. У меня огромный массив данных с более чем 1001 строкой. В качестве примера, приведенного ниже, я хотел бы разделить фрейм данных (df will be divided into df1,df2) и применить один и тот же набор transpose operations к различным результирующим фреймам данных. Спасибо Jezrael за помощь в достижении этого уровня. Пожалуйста, найдите под моим входным фреймом данных

df = pd.DataFrame({
'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],
'readings' : ['READ_1','READ_2','READ_1','READ_3','READ_1','READ_5','READ_6','READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],
'val' :[5,6,7,11,5,7,16,12,13,56,32,13,45,43,46],
})

код для разделения фрейма данных

N=2  # dividing into two dataframes.
dfs = [x for _,x in df.groupby(pd.factorize(df['subject_id'])[0] // N)] # dfs is an iterable which will have two dataframes

параллельнообработка кода

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = []

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()

results.append(pool.map(transpose_ope, [df for df in dfs])) # am I storing the output correctly here?

На самом деле, я хотел бы добавить выходные данные каждого этапа в основной фрейм данных.

Можете ли вы помочь мне сделать это? Мой код продолжает работать даже для 10-15 записей.

1 Ответ

1 голос
/ 20 октября 2019

Функция, которую вы используете в карте, должна возвращать нужный объект.

Я бы также использовал более идиоматический менеджер контекста, доступный для пула.

РЕДАКТИРОВАТЬ: Исправлен импорт

import multiprocessing as mp

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()
    return df_op


def main():

    with mp.Pool(mp.cpu_count()) as pool:
        res = pool.map(transpose_ope, [df for df in dfs])

if __name__=='__main__':
   main()

Не уверен, почему вы добавляете один список в другой список ... но если вы просто хотите получить окончательный список [преобразуется (df) для df в dfs], карта возвращает именно это.

...