Как обернуть функции списка в python? - PullRequest
1 голос
/ 06 мая 2020

Я не могу точно отразить эту проблему в заголовке. Я хочу использовать list, func(*args) и Pool.map без ошибок. См. Ниже.

▼ Код

def df_parallelize_run(func, arguments):
    p = Pool(psutil.cpu_count())
    df = p.map(func, arguments)
    p.close()
    p.join()
    return df
def make_lag(df: DataFrame, LAG_DAY: list):
    for l in LAG_DAY:
        df[f'lag{l}d'] = df.groupby(['id'])['target'].transform(lambda x: x.shift(l))

    return df
def wrap_make_lag(args):
    return make_lag(*args)

Учитывая три вышеупомянутые функции, я хочу выполнить следующие действия:

# df: DataFrame
arguments = (df, [1, 3, 7, 13, 16])
df = df_parallelize_run(wrap_make_lag, arguments)

▼ Ошибка

in df_parallelize_run(func, arguments)
----> 7     df = pool.map(func, arguments)

in ..../python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()

in ..../python3.7/multiprocessing/pool.py in get(self, timeout)
--> 657             raise self._value

TypeError: make_lag() takes 2 positional arguments but 5 were given

Я знаю причину этого несоответствия (из-за распаковки списка, [1, 3, 7, 13, 16], это 5). Как правильно делать? Если возможно, я хочу уместить этот список в пределах ограничений позиционных аргументов. Если это практически невозможно (list или Pool.map), что более подходящий, простой и гибкий способ?

Ответы [ 2 ]

2 голосов
/ 06 мая 2020

Используйте pool.starmap. Вы создаете список кортежей для аргументов вашей функции. Здесь похоже, что df каждый раз одно и то же, а arg - это каждый элемент в arguments.

arglist = [(df, arg) for arg in arguments]
with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
    results = p.starmap(make_lag, arglist)

0 голосов
/ 06 мая 2020

Решено. Я переписал следующим образом.

▼ Функции

def df_parallelize_run(func, arglist):    
    with Pool(psutil.cpu_count()) as p:
        # concat((lots of returned df))
        results = pd.concat(p.starmap(func, arglist), 1)
    return results
def make_lag(df, lag):
    if not isinstance(lag, list):
        lag = [lag]

    # it doesn't have to be for-loop when you use multiprocessing
    for l in lag:
        col_name = f'lag{l}d'
        df[col_name] = df.groupby(['item_id', 'store_id'])['sales'].transform(lambda x: x.shift(l))

    return df[[col_name]]

Другая функция

def make_lag_roll(df, lag, roll):
    col_name = f'lag{lag}_roll_mean_{roll}'
    df[col_name] = df.groupby(['id'])['target'].transform(lambda x: x.shift(lag).rolling(roll).mean())

    return df[[col_name]]

▼ Как использовать

arglist =  [(df[['id', 'target']], arg) for arg in range(1, 36)]

lag_df = df_parallelize_run(make_lag, arglist)
arglist_roll = [(df[['id', 'target']], lag, roll)
               for lag in range(1, 36)
               for roll in [7, 14, 28]]

lag_roll_df = df_parallelize_run(make_lag_roll, arglist_roll)
...