Broken Pipe при использовании Python Multiprocessing Manager (BaseManager / SyncManager) для совместного использования очереди с удаленными компьютерами - PullRequest
14 голосов
/ 06 сентября 2010

В прошлом месяце у нас была постоянная проблема с многопроцессорным пакетом Python 2.6.x, когда мы пытались использовать его для разделения очереди между несколькими разными (linux) компьютерами.Я задал этот вопрос непосредственно Джесси Ноллеру, так как мы еще не нашли ничего, что объясняло бы проблему в StackOverflow, документах Python, исходном коде или где-либо еще в Интернете.

Наша команда инженеров еще не былаудалось решить эту проблему, и мы поставили вопрос перед довольно многими людьми в группах пользователей python, но безрезультатно.Я надеялся, что кто-то может пролить некоторую проницательность, поскольку я чувствую, что мы делаем что-то неправильно, но слишком близки к проблеме, чтобы понять, что это такое.

Вот симптом:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

(я показываю, где наш код вызывает queue.get () для объекта общей очереди, размещенного менеджером, который расширяет SyncManger).

Что особенного в этой проблеме, так это то, что если мы подключаемся к этой общей очереди на одной машине (назовем это machine A), даже из множества параллельных процессов, мы, похоже, никогда не столкнемся с проблемой.Только когда мы подключаемся к очереди (опять же, используя класс, который расширяет многопроцессорность SyncManager и в настоящее время не добавляет дополнительных функций) с других компьютеров (давайте назовем эти machines B and C) и запустим большое количество элементов в и из очереди вв то же время, когда мы сталкиваемся с проблемой.

Как будто многопроцессорный пакет python обрабатывает локальные соединения (хотя они все еще используют тот же метод соединения manager.connect ()) способом, который работает с machine A но когда удаленные соединения выполняются одновременно, по крайней мере, с одним из machines B or C, мы получаем ошибку Сломанный канал.

Во всех чтениях, выполненных моей командой, мы думали, что проблема связана с блокировкой.Мы подумали, что, возможно, нам не следует использовать Queue.Queue, а вместо этого multiprocessing.Queue, но мы переключились, и проблема осталась (мы также заметили, что собственная общая очередь SyncManager является экземпляром Queue.Queue).

Мы стараемся изо всех сил отладить проблему, поскольку ее трудно воспроизвести, но это происходит довольно часто (много раз в день, если мы вставляем и .get () много элементов изочередь).

Метод, который мы создали get_from_queue, пытается повторить попытку получения элемента из очереди ~ 10 раз с рандомизированными интервалами ожидания, но, похоже, что если он один раз потерпит неудачу, он потерпит неудачу все десять раз (что привело меня кПолагаю, что .register () и .connect () для менеджера, возможно, не дают другого сокет-соединения с сервером, но я не мог подтвердить это, либо читая документы, либо просматривая внутренний исходный код Python).

Может ли кто-нибудь рассказать о том, куда мы можем посмотреть или как мы можем отследить, что на самом деле происходит?

Как мы можем начать новое соединение в случае разрыва трубы, используя multiprocessing.BaseManager или multiprocessing.SyncManager?

Как мы можем предотвратить поломку трубы?

Ответы [ 3 ]

9 голосов
/ 29 сентября 2010

К вашему сведению Если кто-то еще запускает эту же ошибку, после продолжительных консультаций с Ask Solem и Джесси Ноллером из основной команды разработчиков Python, похоже, что это на самом деле ошибка в текущем Python 2.6.x (и, возможно, 2.7+ и, возможно,3.x).Они ищут возможные решения, и исправление, вероятно, будет включено в будущую версию Python.

7 голосов
/ 04 мая 2011

Я страдал от той же проблемы, даже если подключение к локальному хосту в Python 2.7.1.После дня отладки я нашел причину и обходной путь:

Причина: класс BaseProxy имеет локальное хранилище потока, которое кэширует соединение, которое повторно используется для будущих соединений, вызывая ошибки «сломанной трубы» даже при создании нового диспетчера

Обходной путь: удалите кэшированное соединение перед повторным подключением.Добавьте код в предложение try-exc в строке, которая вызывает исключение, а затем повторите его.

from multiprocessing.managers import BaseProxy

...

if address in BaseProxy._address_to_local:
    del BaseProxy._address_to_local[address][0].connection

address - это имя хоста / ip, используемое для подключения к диспетчеру многопроцессорной обработки.Если вы не установили его явно, обычно оно должно быть "localhost"

0 голосов
/ 16 декабря 2014

Также вы можете попытаться перехватить исключение в дочерних процессах, чтобы оно не пыталось закрыть соединение непредвиденно.То же самое происходило со мной, и, наконец, мне пришлось подавить ошибки, чтобы труба не закрывалась внезапно.

...