Использование многопроцессорной обработки python для a для l oop, который добавляет результаты в словарь - PullRequest
2 голосов
/ 04 февраля 2020

Итак, я посмотрел как документацию модуля многопроцессорной обработки, так и другие заданные здесь вопросы, но ни один из них не похож на мой случай, поэтому я начал новый вопрос.

Для простоты у меня есть фрагмент кода:

# simple dataframe of some users and their properties.
data = {'userId': [1, 2, 3, 4],
        'property': [12, 11, 13, 43]}
df = pd.DataFrame.from_dict(data)

# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)

# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
    df1 = df.userId.isin(permutation[0])
    df2 = df.userId.isin(permutation[1])
    user_dict[permutation[0]] += permutation[1]
    return user_dict


# and finally a loop: 
user_dict = defaultdict(int)
for permutation in user_perm:
    user_dict = comp_rel(df, permutation, user_dict)    

Я знаю, что этот код имеет очень мало (если вообще есть) смысла сейчас, но я просто написал небольшой пример это близко к структуре реального кода, над которым я работаю. Это user_dict должно наконец содержать userIds и некоторое значение.

У меня есть реальный код, и он отлично работает, дает правильный dict и все, но ... он работает в одном потоке. И это мучительно медленно, учитывая, что у меня есть еще 15 совершенно бесплатных тем.

У меня вопрос: как я могу использовать модуль multiprocessing из python, чтобы изменить последний для l oop и иметь возможность работать на всех доступных потоках / ядрах? Я посмотрел на документацию, это не очень легко понять.

РЕДАКТИРОВАТЬ: я пытаюсь использовать пул как:

p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()

однако это ломается, потому что я использую строку:

user_dict = comp_rel(df, permutation, user_dict) 

в исходном коде, и я не знаю, как эти словари должны быть объединены после завершения пула.

Ответы [ 2 ]

2 голосов
/ 04 февраля 2020

В вашем comp_rel есть две части, которые необходимо разделить - во-первых, это сам расчет, который вычисляет некоторое значение для некоторого идентификатора пользователя. Второй шаг - это накопление, которое добавляет это значение к результату user_dict.

Вы можете разделить само вычисление так, чтобы оно возвращало кортеж (id, value) и обрабатывать его с помощью многопроцессорной обработки, а затем накапливать результаты впоследствии в основном потоке:

from multiprocessing import Pool
from functools import partial
from collections import defaultdict

# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
    results = pool.map(comp_with_df, user_perm)

# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
    user_dict[k] += v

В качестве альтернативы вы также можете напрямую передать объект Manager().dict() в функцию обработки, но это немного сложнее и, скорее всего, не даст вам никакой дополнительной скорости.


Основываясь на предложении @ Masklinn, вот несколько лучший способ сделать это, чтобы избежать перегрузки памяти:

user_dict = defaultdict(int)
with Pool(None) as pool:
    for k, v in pool.imap_unordered(comp_with_df, user_perm):
        user_dict[k] += v

Таким образом, мы суммируем результаты по мере их завершения, а не необходимо сначала сохранить их все в промежуточном списке.

1 голос
/ 04 февраля 2020

После короткого обсуждения в комментариях я решил опубликовать решение, используя ProcessPoolExecutor:

import concurrent.futures
from collections import defaultdict

def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(comp_rel, df, perm): perm for perm in user_perm}
    for future in concurrent.futures.as_completed(futures):
        try:
            k, v = future.result()
        except Exception as e:
            print(f"{futures[future]} throws {e}")
        else:
            user_dict[k] += v

Он работает так же, как @tzaman, но дает вам возможность обрабатывать исключения , Также в этом модуле есть более интересные функции, проверьте docs .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...