Многопроцессорность: как использовать Pool.map для функции, определенной в классе? - PullRequest
161 голосов
/ 20 июля 2010

Когда я запускаю что-то вроде:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

работает нормально. Однако, помещая это как функцию класса:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Дает мне следующую ошибку:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Я видел сообщение от Алекса Мартелли, посвященное той же проблеме, но оно не было достаточно явным.

Ответы [ 16 ]

79 голосов
/ 18 апреля 2013

Я не смог использовать коды, опубликованные до сих пор, потому что коды, использующие «multiprocessing.Pool», не работают с лямбда-выражениями, а коды, не использующие «multiprocessing.Pool», порождают столько процессов, сколько есть рабочих элементов.

Я адаптировал код s.t. он порождает предопределенное количество рабочих и выполняет итерацию по списку ввода, только если существует неработающий работник. Я также включил режим "демон" для рабочих s.t. Ctrl-C работает как положено.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
67 голосов
/ 26 апреля 2011

Меня также раздражали ограничения на то, какие функции pool.map может принимать. Я написал следующее, чтобы обойти это. Похоже, что это работает, даже для рекурсивного использования parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe,x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p,c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
45 голосов
/ 25 января 2014

Многопроцессорная обработка и травление нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.

Если вы используете форк multiprocessing с именем pathos.multiprocesssing, вы можете напрямую использовать классы и методы классов в многопроцессорных функциях map. Это потому, что dill используется вместо pickle или cPickle, а dill может сериализовать почти все в Python.

pathos.multiprocessing также предоставляет функцию асинхронного отображения ... и может map функции с несколькими аргументами (например, map(math.pow, [1,2,3], [4,5,6]))

Смотрите обсуждения: Что могут делать мультипроцессор и укроп вместе?

и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Он даже обрабатывает код, который вы написали изначально, без изменений и от интерпретатора. Зачем делать что-то еще более хрупкое и специфичное для одного случая?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Получить код здесь: https://github.com/uqfoundation/pathos

И, просто, чтобы похвастаться немного больше того, что он может сделать:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
39 голосов
/ 26 июля 2010

На данный момент нет решения вашей проблемы, насколько я знаю: функция, которую вы даете map(), должна быть доступна через импорт вашего модуля.Вот почему работает код Роберта: функцию f() можно получить, импортировав следующий код:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

Я фактически добавил «основной» раздел, потому что он соответствует рекомендациям для Windowsplatform («Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непреднамеренных побочных эффектов»).

Я также добавил заглавную букву перед Calculate, чтобыследовать PEP 8 .:)

18 голосов
/ 10 мая 2012

Решение по mrule верное, но имеет ошибку: если дочерний элемент отправляет обратно большой объем данных, он может заполнить буфер канала, блокируя дочерний элемент pipe.send(), пока родительский процесс ожидает выхода дочернего элемента на pipe.join(). Решение состоит в том, чтобы прочитать данные о ребенке перед тем, как join() сообщить ребенку. Кроме того, ребенок должен закрыть родительский конец трубы, чтобы предотвратить тупик. Код ниже исправляет это. Также имейте в виду, что parmap создает один процесс на элемент в X. Более сложное решение - использовать multiprocessing.cpu_count(), чтобы разделить X на несколько кусков, а затем объединить результаты перед возвратом. Я оставляю это как упражнение для читателя, чтобы не испортить краткость милого ответа от mrule. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
13 голосов
/ 16 мая 2011

Я тоже с этим боролся. В качестве упрощенного примера у меня были функции в качестве членов данных класса:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

Мне нужно было использовать функцию self.f в вызове Pool.map () из того же класса, и self.f не принял кортеж в качестве аргумента. Поскольку эта функция была встроена в класс, мне не было понятно, как написать тип оболочки, предложенный другими ответами.

Я решил эту проблему, используя другую оболочку, которая принимает кортеж / список, где первый элемент - это функция, а остальные элементы - аргументы этой функции, называемые eval_func_tuple (f_args). Используя это, проблемная строка может быть заменена возвращением pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Вот полный код:

Файл: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Файл: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Запуск main.py даст [11, 22, 33]. Не стесняйтесь улучшить это, например, eval_func_tuple также может быть изменен, чтобы принимать аргументы ключевого слова.

С другой стороны, в других ответах функция «parmap» может быть сделана более эффективной для случая с большим количеством процессов, чем с числом доступных процессоров. Я копирую отредактированную версию ниже. Это мой первый пост, и я не был уверен, стоит ли мне напрямую редактировать исходный ответ. Я также переименовал некоторые переменные.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
7 голосов
/ 28 мая 2016

Я взял ответ Клауса Се и Агандерса3 и сделал документированный модуль, который будет более читабельным и хранится в одном файле. Вы можете просто добавить его в свой проект. Он даже имеет дополнительный индикатор выполнения!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

РЕДАКТИРОВАТЬ : Добавлено предложение @ alexander-mcfarlane и функция тестирования

7 голосов
/ 20 июля 2010

Функции, определенные в классах (даже внутри функций в классах), на самом деле не работают. Однако это работает:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()
6 голосов
/ 22 апреля 2017

Я знаю, что об этом спрашивали более 6 лет назад, но я просто хотел добавить свое решение, поскольку некоторые из приведенных выше предложений кажутся ужасно сложными, но мое решение на самом деле было очень простым.

Все, что мне нужно было сделать, это обернуть вызов pool.map () во вспомогательную функцию. Передача объекта класса вместе с аргументами для метода в виде кортежа, который выглядел примерно так:

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
3 голосов
/ 03 сентября 2015

Я изменил метод Клауса Се, потому что, пока он работал для меня с небольшими списками, он зависал, когда число элементов было ~ 1000 или больше.Вместо того, чтобы запускать задания по одному с условием остановки None, я загружаю очередь ввода сразу и просто позволяю процессам жевать ее до тех пор, пока она не станет пустой.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Редактировать: к сожалениютеперь я сталкиваюсь с этой ошибкой в ​​моей системе: Максимальный размер очереди многопроцессорной обработки равен 32767 , надеюсь, обходные пути там помогут.

...