Multiprocessing.pool с функцией, которая имеет несколько аргументов и kwargs - PullRequest
1 голос
/ 15 января 2020

Я бы хотел распараллелить вычисления, используя метод mutliprocessing.pool. Проблема состоит в том, что функция, которую я хотел бы использовать в вычислениях, представляет два аргумента и необязательные kwargs, являясь первым аргументом в фрейме данных, вторым - в str, а любой kwargs - в словаре.

И фрейм данных, и Словарь, который я хочу использовать, одинаков для всех вычислений, которые я пытаюсь выполнить, являясь лишь вторым аргументом, который постоянно меняется. Поэтому я надеялся, что смогу передать его в виде списка различных строк с помощью метода map в уже упакованную функцию с df и dict.

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    result_col= ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    print(results)

Однако, когда я запускаю приведенный выше код, я получаю следующую ошибку: TypeError: sumifs() missing 1 required positional argument: 'result_col'. Как я могу предоставить функции первый аргумент arg и kwargs, а второй аргумент представить в виде списка str, чтобы я мог парализовать вычисления? Я прочитал несколько похожих вопросов на форуме, но ни одно из решений, похоже, не работает для этого случая ...

Спасибо и извинения, если что-то не понятно, я только что узнал о пакете многопроцессорности сегодня!

Ответы [ 2 ]

2 голосов
/ 16 января 2020

Давайте посмотрим на две части вашего кода.

Во-первых, объявление функции sumifs:

def sumifs(df, result_col, **kwargs):

Во-вторых, вызов этой функции с соответствующими параметрами.

# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}

# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)

Обновление 1:

После того, как исходный код был отредактирован. Похоже, что проблема заключается в назначении позиционного аргумента, попробуйте его отбросить.

заменить строку:

results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)

на:

results = pool.map(partial(sumifs, ca, **keywords), result_col)

Пример кода:

import multiprocessing
from functools import partial

def test_func(arg1, arg2, **kwargs):
    print(arg1)
    print(arg2)
    print(kwargs)
    return arg2

if __name__ == '__main__':
    list_of_args2 = [1, 2, 3]
    just_a_dict = {'key1': 'Some value'}
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
    print(results)

Будет выводить:

This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']

Еще один пример того, как Multiprocessing.pool с функцией, которая имеет несколько аргументов и kwargs


Обновление 2:

Расширенный пример (из-за к комментариям):

Интересно, однако, тем же образом, если в моей функции было три аргумента и kwargs, и я хотел сохранить arg1, arg3 и kwargs стоимостным, как я мог передать arg2 как список для многопроцессорной обработки? По сути, как я буду указывать многопроцессорную обработку этой карты (частичная (test_fun c, 'Это arg1', 'Это будет arg3', ** just_a_dict), arg2) второе значение частично соответствует arg3, а не arg2?

Код Обновление 1 мог бы измениться следующим образом:

# The function signature
def test_func(arg1, arg2, arg3, **kwargs):

# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)

Это можно сделать с помощью позиционного и ключевого назначения python . Обратите внимание, что kwargs оставлен в стороне и не назначен с использованием ключевого слова , несмотря на тот факт, что он расположен после назначенного значения ключевого слова .

Дополнительная информация о назначении аргумента различия можно найти здесь .

1 голос
/ 16 января 2020

Если существует фрагмент данных, который является постоянным / фиксированным для всех работ / заданий, тогда лучше «инициализировать» процессы в пуле с этими фиксированными данными во время создания пула и отобразить изменяющиеся данные , Это позволяет избежать повторной отправки фиксированных данных при каждом запросе на работу. В вашем случае я бы сделал что-то вроде следующего:

df = None
kw = {}

def initialize(df_in, kw_in):
    global df, kw
    df, kw = df_in, kw_in

def worker(data):
    # computation involving df, kw, and data
    ...

...
    with multiprocessing.Pool(max_number_processes, intializer, (base, keywords)) as pool:
        pool.map(worker, varying_data)

Этот gist содержит полный пример использования инициализатора. Это сообщение в блоге объясняет увеличение производительности при использовании инициализатора.

...