multiprocessing.Pool - PicklingError: Невозможно выбрать <type 'thread.lock'>: поиск атрибута thread.lock не выполнен - PullRequest
31 голосов
/ 23 октября 2011

multiprocessing.Pool сводит меня с ума ...
Я хочу обновить множество пакетов, и для каждого из них я должен проверить, есть ли более большая версия или нет.Это делается с помощью функции check_one.
Основной код находится в методе Updater.update: там я создаю объект Pool и вызываю метод map().

Вот код:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

Когда я запускаю его, я получаю странную ошибку:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Ответы [ 3 ]

28 голосов
/ 23 октября 2011
Многопроцессорная обработка

передает задачи (включая check_one и data) рабочим процессам через mp.SimpleQueue. В отличие от Queue.Queue s, все, что находится в mp.SimpleQueue, должно быть отборным. Queue.Queue s нельзя выбрать:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

дает это исключение:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

Ваш data включает packages, который является Queue.Queue. Это может быть источником проблемы.


Вот возможный обходной путь: Queue используется для двух целей:

  1. , чтобы узнать приблизительный размер (позвонив по номеру qsize)
  2. для сохранения результатов для последующего извлечения.

Вместо вызова qsize, чтобы разделить значение между несколькими процессами, мы могли бы использовать mp.Value.

Вместо сохранения результатов в очереди мы можем (и должны) просто возвращать значения из вызовов в check_one. pool.map собирает результаты в очередь своего собственного производства и возвращает результаты как возвращаемое значение pool.map.

Например:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()
6 голосов
/ 29 июля 2016

После долгих копаний по аналогичной проблеме ...

Также выясняется, что ЛЮБОЙ объект, который содержит поток. Объект () НИКОГДА не будет работать с multiprocessing.Pool.

Вот пример

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

Я запустил это с Python 2.7.5 и получил ту же ошибку:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Но затем запустил его на python 3.4.1, и эта проблема была исправлена.

Хотя я еще не сталкивался с полезными обходными путями для тех из нас, кто все еще пользуется 2.7.x.

1 голос
/ 14 июня 2019

У меня возникла эта проблема с версией Python 3.6 в докере. Изменили версию на 3.7.3 и это было решено.

...