Python использование многопроцессорности для ускорения слияния счетчиков - PullRequest
4 голосов
/ 30 апреля 2020

Я пытаюсь создать очень простую систему рекомендации предметов, используя количество раз купленных предметов,

, поэтому сначала я создал словарь item2item для счетчика, например

item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}

Например, люди покупали 'A' и 'B' вместе 4 раза.

Итак, если я получу образцы = ['A', 'C'], item2item ['A'] + item2item ['C '] дает наиболее часто покупаемые предметы

Однако, как вы могли ожидать, эта операция довольно тяжелая, поэтому я попытался использовать мультиобработку, как показано ниже

from tqdm import tqdm
from operator import add
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
from collections import Counter

with ProcessPoolExecutor(max_workers=10) as pool:
    for samples in tqdm(sample_list):
        # w/o PoolExecutor
        # combined = reduce(add, [item2item[s] for s in samples], Counter())
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        combined = future.result()

Однако это не ускорил процесс вообще.

Я подозреваю, что счетчик в функции уменьшения не является общим, как ответили в Python многопроцессорная и общий счетчик и https://docs.python.org/3/library/multiprocessing.html#sharing -state-Между-процессами .

Честно говоря, я не знаю, как эффективно использовать классы Value, Array или счетчик перезаписи.

Любая помощь приветствуется.

1 Ответ

6 голосов
/ 04 мая 2020

Вызов combined = future.result() блокируется до тех пор, пока результат не будет завершен, поэтому вы не отправляете последующий запрос в пул, пока не завершится предыдущий запрос. Другими словами, вы никогда не запускаете более одного подпроцесса. Как минимум вы должны изменить свой код на:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    results = [f.result() for f in the_futures()] # all the results

Другой способ:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        result = future.result() # do something with this

Кроме того, если вы не укажете max_workers на ProcessPoolExecutor конструктор, по умолчанию это число процессоров на вашей машине. Ничего нельзя получить, указав значение, превышающее количество процессоров, которые у вас есть на самом деле.

Обновление

Если вы хотите обработать результаты как только они завершены и им нужен способ вернуть результат * ie обратно к исходному запросу, вы можете сохранить фьючерсы в виде ключей в словаре, где соответствующие значения представляют аргументы запросов. В этом случае:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = {}
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures[future] = samples # map future to request
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        samples = the_futures[future] # the request
        result = future.result() # the result
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...