Сброс многопроцессорности. Очередь в список - PullRequest
20 голосов
/ 09 октября 2009

Я хочу выбросить multiprocessing.Queue в список. Для этой задачи я написал следующую функцию:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

Но по какой-то причине это не работает.

Соблюдайте следующую сессию оболочки:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 

Что здесь происходит? Почему не все предметы сбрасываются?

Ответы [ 2 ]

25 голосов
/ 09 октября 2009

Попробуйте это:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

В многопроцессорных очередях есть внутренний буфер, который имеет поток фидера, который вытягивает работу из буфера и сбрасывает его в канал. Если бы не все объекты были сброшены, я мог бы увидеть случай, когда Empty поднимается преждевременно. Использование сторожа для указания конца очереди является безопасным (и надежным). Также лучше использовать идиому iter (get, sentinel), чем полагаться на Empty.

Мне не нравится, что он может поднять пустой из-за сбрасывания времени (я добавил time.sleep (.1), чтобы разрешить переключение контекста в поток фидера, он вам может не понадобиться, он работает без него - привычка выпускать GIL).

3 голосов
/ 29 сентября 2016

В некоторых ситуациях мы уже все вычислили и хотим просто преобразовать очередь.

shared_queue = Queue()
shared_queue_list = []
...
join() #All process are joined
while shared_queue.qsize() != 0:
    shared_queue_list.append(shared_queue.get())

Теперь в shared_queue_list результаты преобразованы в список.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...