Максимальный размер очереди многопроцессорной обработки - 32767 - PullRequest
12 голосов
/ 05 мая 2011

Я пытаюсь написать программу на Python 2.6 (OSX) с использованием многопроцессорной обработки, и я хочу заполнить очередь больше, чем по умолчанию 32767 элементов.

from multiprocessing import Queue
Queue(2**15) # raises OSError

Queue(32767) работает нормально, но любое большее число (например, Queue(32768)) завершается неудачно с OSError: [Errno 22] Invalid argument

Есть ли решение этой проблемы?

Ответы [ 3 ]

3 голосов
/ 05 мая 2011

Один из подходов - обернуть ваш multiprocessing.Queue пользовательским классом (только на стороне производителя или прозрачно с точки зрения потребителя). Используя это, вы поставили бы в очередь элементы, подлежащие отправке в объект Queue, который вы оборачиваете, и передавали вещи только из локальной очереди (объект Python list()) в multiprocess.Queue по мере появления пространства, с обработкой исключений на газ, когда Queue заполнен.

Это, вероятно, самый простой подход, поскольку он должен оказывать минимальное влияние на остальную часть вашего кода. Пользовательский класс должен вести себя как очередь, при этом скрывая базовый multiprocessing.Queue за абстракцией.

(Один из подходов может заключаться в том, чтобы ваш производитель использовал потоки, один поток для управления отправкой из потока Queue в ваш multiprocessing.Queue и любые другие потоки, которые фактически просто подают поток Queue).

2 голосов
/ 21 декабря 2011

Я уже ответил на исходный вопрос, но мне кажется, что я добавил, что списки Redis достаточно надежны, и их поддержка модуля Python чрезвычайно проста в использовании для реализации объекта, подобного очереди. Их преимущество заключается в том, что они позволяют масштабироваться как на несколько узлов (по всей сети), так и на несколько процессов.

В основном, чтобы использовать те, которые вы просто выбрали бы ключ (строку) для имени своей очереди, проследите, чтобы ваши продюсеры вставили в него и ваши рабочие (потребители задач) зацикливались на блокировании всплывающих окон из этого ключа.

Все команды Redis BLPOP и BRPOP принимают список ключей (списки / очереди) и необязательное значение времени ожидания. Они возвращают кортеж (ключ, значение) или None (по таймауту). Таким образом, вы можете легко написать систему, управляемую событиями, которая очень похожа на привычную структуру select () (но на гораздо более высоком уровне). Единственное, на что вам нужно обратить внимание - это пропущенные ключи и недопустимые типы ключей (конечно, просто оберните ваши операции с очередями обработчиками исключений). (Если какое-либо другое приложение останавливается на вашем общем сервере Redis, удаляя ключи или заменяя ключи, которые вы использовали в качестве очередей со строковыми / целыми числами или другими типами значений ... ну, у вас есть другая проблема в этой точке). :)

Еще одним преимуществом этой модели является то, что Redis сохраняет свои данные на диск. Таким образом, ваша рабочая очередь может выдержать перезапуски системы, если вы разрешите это.

(Конечно, вы можете реализовать простую очередь в виде таблицы в SQLlite или любой другой системе SQL, если вы действительно хотите это сделать; просто используйте какой-нибудь автоматически увеличивающийся индекс для последовательности и столбец, чтобы пометить каждый элемент как будучи «готовым» (потребленным), но это действительно несколько сложнее, чем использование того, что Redis дает вам «из коробки»).

0 голосов
/ 05 мая 2011

Работа для меня на MacOSX

>>> import Queue
>>> Queue.Queue(30000000)
<Queue.Queue instance at 0x1006035f0>
...