Новая версия кода выше ...
Не уверен, насколько хорошо работает выбор в многопроцессорной очереди в Windows. Так как select в windows прослушивает сокеты, а не дескрипторы файлов, я подозреваю, что могут быть проблемы.
Мой ответ состоит в том, чтобы создать поток для прослушивания каждой очереди блокирующим способом и поместить все результаты в одну очередь, прослушиваемую основным потоком, по существу объединяя отдельные очереди в одну.
Мой код для этого:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Следующий код является моей тестовой программой, чтобы показать, как она работает:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)