Python 2.6 multiprocessing.Queue совместим с потоками? - PullRequest
7 голосов
/ 05 декабря 2008

Я экспериментирую с новым многопроцессорным модулем в Python 2.6. Я создаю несколько процессов, каждый из которых имеет свой собственный экземпляр multiprocessor.JoinableQueue. Каждый процесс порождает один или несколько рабочих потоков (подклассов threading.Thread), которые совместно используют экземпляр JoinableQueue (передаваемый через метод __init__ каждого потока). Кажется, что в целом это работает, но иногда и непредсказуемо терпит неудачу со следующей ошибкой:

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
    self.queue.task_done()
  File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

Мои вызовы get () и task_done () выполняются сразу друг за другом, поэтому они должны быть равны. Как ни странно, это происходит только тогда, когда работа, выполняемая между get () и task_done (), выполняется ОЧЕНЬ быстро. Вставка небольшого time.sleep(0.01), кажется, облегчает проблему.

Есть идеи, что происходит? Могу ли я использовать многопроцессорную очередь с потоками вместо более традиционной (Queue.Queue)?

Спасибо!

-Брайан

Ответы [ 4 ]

4 голосов
/ 05 декабря 2008

Я еще не экспериментировал с мультипроцессором в 2.6, но я много играл с пиропроцессором (как это называлось в 2.5).

Я вижу, что вы ищете несколько процессов, каждый из которых порождает набор потоков соответственно.

Поскольку вы используете многопроцессорный модуль, я предлагаю использовать многопроцессорный, а не многопоточный подход, вы будете сталкиваться с меньшими проблемами, такими как взаимоблокировки и т. Д.

Создать объект очереди. http://pyprocessing.berlios.de/doc/queue-objects.html

Для создания многопроцессорной среды используйте пул: http://pyprocessing.berlios.de/doc/pool-objects.html, который будет управлять рабочими процессами за вас. Затем вы можете применить асинхронный / синхронный к работникам, а также можете добавить обратный вызов для каждого работника, если это необходимо. Но помните, что обратный вызов - это обычный блок кода, и он должен немедленно вернуться (как указано в документации)

Некоторая дополнительная информация: При необходимости создайте менеджер http://pyprocessing.berlios.de/doc/manager-objects.html для управления доступом к объекту очереди. Вы должны будете сделать объект очереди доступным для этого. Но преимущество в том, что после совместного использования и управления вы можете получить доступ к этой общей очереди по всей сети, создав прокси-объекты. Это позволит вам вызывать методы объекта централизованной общей очереди как (очевидно) собственные методы на любом сетевом узле.

вот пример кода из документации

Можно запустить сервер менеджера на одном компьютере, и клиенты могут использовать его с других компьютеров (при условии, что соответствующие брандмауэры позволяют это). Выполнение следующих команд создает сервер для общей очереди, который могут использовать удаленные клиенты:

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

Если вы настаиваете на безопасном материале с резьбой, PEP371 (многопроцессорная обработка) ссылается на это http://code.google.com/p/python-safethread/

2 голосов
/ 05 декабря 2008

Вы должны передать объекты Queue в качестве аргументов цели.

Пример из документации многопроцессорной обработки :

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

Очереди потокобезопасны и безопасны для обработки.

1 голос
/ 07 февраля 2010

Вы можете столкнуться с этой ошибкой:

http://bugs.python.org/issue4660

0 голосов
/ 05 декабря 2008

Спасибо за быстрый ответ. Я передаю экземпляры multiprocessing.Queue в качестве аргументов каждому процессу, как вы иллюстрируете. Кажется, что сбой происходит в потоках. Я создаю их путем создания подклассов threading.Thread и передачи очереди методу ' init ' каждого экземпляра потока. Похоже, что это приемлемый способ передачи очереди в подклассы потока. Я только подумал, что многопроцессорные очереди могут быть несовместимы с потоками (хотя они предположительно поточно-ориентированы).

...