Я тоже с этим боролся. В качестве упрощенного примера у меня были функции в качестве членов данных класса:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
Мне нужно было использовать функцию self.f в вызове Pool.map () из того же класса, и self.f не принял кортеж в качестве аргумента. Поскольку эта функция была встроена в класс, мне не было понятно, как написать тип оболочки, предложенный другими ответами.
Я решил эту проблему, используя другую оболочку, которая принимает кортеж / список, где первый элемент - это функция, а остальные элементы - аргументы этой функции, называемые eval_func_tuple (f_args). Используя это, проблемная строка может быть заменена возвращением pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Вот полный код:
Файл: util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
Файл: main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
Запуск main.py даст [11, 22, 33]. Не стесняйтесь улучшить это, например, eval_func_tuple также может быть изменен, чтобы принимать аргументы ключевого слова.
С другой стороны, в других ответах функция «parmap» может быть сделана более эффективной для случая с большим количеством процессов, чем с числом доступных процессоров. Я копирую отредактированную версию ниже. Это мой первый пост, и я не был уверен, стоит ли мне напрямую редактировать исходный ответ. Я также переименовал некоторые переменные.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))