Многопроцессорная обработка Python PicklingError: Can't pickle - PullRequest
187 голосов
/ 10 января 2012

Я сожалею, что не могу воспроизвести ошибку на более простом примере, и мой код слишком сложен для публикации.Если я запускаю программу в оболочке IPython вместо обычного Python, все работает хорошо.

Я просмотрел несколько предыдущих заметок по этой проблеме.Все они были вызваны использованием пула для вызова функции, определенной в функции класса.Но это не так для меня.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Буду признателен за любую помощь.

Обновление : Функция, которую я выбираю, определяется на верхнем уровне модуля.Хотя он вызывает функцию, которая содержит вложенную функцию.то есть f() вызывает g(), вызывает h(), у которого есть вложенная функция i(), а я звоню pool.apply_async(f).f(), g(), h() все определены на верхнем уровне.Я попробовал более простой пример с этим шаблоном, и он работает, хотя.

Ответы [ 6 ]

243 голосов
/ 10 января 2012

Вот список того, что можно мариновать .В частности, функции можно выбирать только в том случае, если они определены на верхнем уровне модуля.

Этот фрагмент кода:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

выдает ошибку, почти идентичную той, которую вы опубликовали:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Проблема в том, что все методы pool используют queue.Queue для передачи задач рабочим процессам.Все, что проходит через queue.Queue, должно быть доступно для выбора, а foo.work не может быть выбрано, поскольку оно не определено на верхнем уровне модуля.

Это можно исправить, определив функцию на верхнем уровне, который вызывает foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Обратите внимание, что foo выбирается, так как Foo определяется на верхнем уровне, а foo.__dict__ выбирается.

73 голосов
/ 25 января 2014

Я бы использовал pathos.multiprocesssing вместо multiprocessing.pathos.multiprocessing - это вилка multiprocessing, которая использует dill.dill может сериализовать почти все в Python, так что вы можете отправлять намного больше параллельно.Вилка pathos также может работать напрямую с несколькими аргументами, как вам нужно для методов класса.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Получите pathos (и, если хотите, dill) здесь: https://github.com/uqfoundation

24 голосов
/ 10 июля 2014

Как уже говорили другие multiprocessing может передавать объекты Python только рабочим процессам, которые могут быть обработаны.Если вы не можете реорганизовать свой код, как описано в unutbu, вы можете использовать расширенные возможности выбора / удаления данных dill для передачи данных (особенно данных кода), как я покажу ниже.1005 * и никаких других библиотек как pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
15 голосов
/ 31 октября 2012

Я обнаружил, что могу также сгенерировать именно эту ошибку на идеально работающем фрагменте кода, пытаясь использовать на нем профилировщик.

Обратите внимание, что это было в Windows (где разветвление немногоменее элегантно).

Я выполнял:

python -m profile -o output.pstats <script> 

и обнаружил, что удаление профилирования устранило ошибку, а размещение профилирования восстановило ее.Сводил меня с ума, потому что я знал, что код работал.Я проверял, не обновлял ли что-то файл pool.py ... затем у меня возникло чувство опускания, и он устранил профилирование. Вот и все.

Размещение здесь для архивов на случай, если кто-нибудь еще столкнется с ним.*

4 голосов
/ 27 сентября 2015

Это решение требует только установки укропа и никаких других библиотек, как пафос

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Это также работает для numpy массивов.

1 голос
/ 26 мая 2017
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Эта ошибка также возникнет, если у вас есть какая-либо встроенная функция внутри объекта модели, которая была передана в асинхронное задание.

Поэтому убедитесь, что проверенные объекты модели не имеют встроенных функций.(В нашем случае мы использовали FieldTracker() функцию django-model-utils внутри модели для отслеживания определенного поля).Вот ссылка на соответствующую проблему GitHub.

...