"выбрать" в нескольких очередях многопроцессорной обработки Python? - PullRequest
26 голосов
/ 14 июля 2009

Каков наилучший способ подождать (без вращения), пока что-либо не будет доступно в одной из двух (многопроцессорная обработка) Очереди , где оба находятся в одной и той же системе?

Ответы [ 8 ]

25 голосов
/ 29 января 2010

На самом деле вы можете использовать multiprocessing.Queue объекты в select.select. т.е.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

выберет que, только если он готов к чтению из.

Нет документации об этом, хотя. Я читал исходный код библиотеки multiprocessing.queue (в linux он обычно выглядит как /usr/lib/python2.6/multiprocessing/queue.py), чтобы выяснить это.

С Queue.Queue я не нашел никакого умного способа сделать это (и я бы очень хотел).

14 голосов
/ 14 июля 2009

Не похоже, что есть официальный способ справиться с этим. Или, по крайней мере, не основываясь на этом:

Вы можете попробовать что-то наподобие того, что делает этот пост - получить доступ к дескрипторам основного канала:

и затем используйте select.

2 голосов
/ 14 июля 2009

Похоже, использование потоков, которые пересылают входящие элементы в одну очередь, которую вы затем ждете, является практическим выбором при использовании многопроцессорной обработки независимо от платформы.

Для избежания потоков требуется либо обработка низкоуровневых каналов / FD, что зависит от платформы и не поддается простому согласованию с высокоуровневым API.

Или вам потребуются очереди с возможностью устанавливать обратные вызовы, которые, я думаю, являются подходящим интерфейсом более высокого уровня. То есть вы бы написали что-то вроде:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Возможно, многопроцессорный пакет может расширить этот API, но его пока нет. Эта концепция хорошо работает с py.execnet, который использует термин «канал» вместо «очереди», см. Здесь http://tinyurl.com/nmtr4w

1 голос
/ 26 апреля 2011

Новая версия кода выше ...

Не уверен, насколько хорошо работает выбор в многопроцессорной очереди в Windows. Так как select в windows прослушивает сокеты, а не дескрипторы файлов, я подозреваю, что могут быть проблемы.

Мой ответ состоит в том, чтобы создать поток для прослушивания каждой очереди блокирующим способом и поместить все результаты в одну очередь, прослушиваемую основным потоком, по существу объединяя отдельные очереди в одну.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

Следующий код является моей тестовой программой, чтобы показать, как она работает:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)
1 голос
/ 26 апреля 2011

Не уверен, насколько хорошо работает выбор в многопроцессорной очереди в Windows. Так как select в windows прослушивает сокеты, а не дескрипторы файлов, я подозреваю, что могут быть проблемы.

Мой ответ состоит в том, чтобы создать поток для прослушивания каждой очереди блокирующим способом и поместить все результаты в одну очередь, прослушиваемую основным потоком, по существу, объединяя отдельные очереди в одну.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

Следующая процедура тестирования показывает, как ее использовать:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Надеюсь, это поможет.

Тони Уоллес

1 голос
/ 14 июля 2009

Вы можете использовать что-то вроде шаблона Observer , в котором подписчики очереди уведомляются об изменениях состояния.

В этом случае ваш рабочий поток может быть назначен слушателем в каждой очереди, и всякий раз, когда он получает сигнал готовности, он может работать с новым элементом, в противном случае - в спящем режиме.

0 голосов
/ 11 февраля 2019

Начиная с Python 3.3, вы можете использовать multiprocessing.connection.wait для ожидания сразу нескольких Queue._reader объектов.

0 голосов
/ 18 января 2014

Не делай этого.

Поместите заголовок в сообщения и отправьте их в общую очередь. Это упрощает код и в целом будет чище.

...