Как использовать метод multiprocessing.Queue.get? - PullRequest
0 голосов
/ 03 ноября 2018

Приведенный ниже код помещает три числа в очередь. Затем он пытается вернуть номера из очереди. Но это никогда не происходит. Как получить данные из очереди?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()

Ответы [ 3 ]

0 голосов
/ 03 ноября 2018

Первоначально я удалил этот ответ после того, как прочитал @Martijn Pieters ', так как он описал «почему это не работает» более подробно и ранее. затем Я понял, что сценарий использования в примере OP не совсем соответствует канонически звучащему названию

«Как использовать метод multiprocessing.Queue.get».

Это не потому, что есть для демонстрации не задействован ни один дочерний процесс, а потому, что в реальных приложениях очередь почти не заполняется заранее и только считывается после, но при чтении и запись происходит чередованием со временем ожидания между ними. Расширенный демонстрационный код, который продемонстрировал Мартейн, не будет работать в обычных сценариях, потому что цикл while прервется слишком рано, когда постановка в очередь не успевает за чтением. Итак, вот перезагруженный ответ, который может справиться с обычными сценариями чередования каналов и чтений:


Не полагайтесь на queue.empty проверяет синхронизацию.

После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод empty () в очереди вернет False и get_nowait () сможет вернуться без вызова queue.Empty. ...

пусто ()

Вернуть True, если очередь пуста, иначе False. Из-за многопоточной / многопроцессорной семантики это ненадежно. 1020 * Docs *

Либо используйте for msg in iter(queue.get, sentinel): до .get() из очереди, где вы выходите из цикла, передавая значение часового ... iter (вызываемое, страж)?

from multiprocessing import Queue

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    for msg in iter(queue.get, SENTINEL):
        print(msg)

... или используйте get_nowait() и обработайте возможное исключение queue.Empty, если вам нужно неблокирующее решение.

from multiprocessing import Queue
from queue import Empty
import time

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    while True:
        try:
            msg = queue.get_nowait()
            if msg == SENTINEL:
                break
            print(msg)
        except Empty:
            # do other stuff
            time.sleep(0.1)

В случае, если только один процесс и только один поток в этом процессе читает очередь, было бы также возможно обменять последний фрагмент кода с:

while True:
    if not queue.empty():  # this is not an atomic operation ...
        msg = queue.get()  # ... thread could be interrupted in between
        if msg == SENTINEL:
            break
        print(msg)
    else:
        # do other stuff
        time.sleep(0.1)

Поскольку поток может отбросить GIL между проверками if not queue.empty() и queue.get(), это не подходит для многопоточного чтения очереди в процессе. То же самое относится, если несколько процессов читают из очереди.

Для сценариев с одним производителем / одним потребителем будет достаточно multiprocessing.Pipe вместо multiprocessing.Queue и более производительным.

0 голосов
/ 04 ноября 2018

Проверьте queue перед использованием get:

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    if not queue.empty():
        print queue.get()
0 голосов
/ 03 ноября 2018

Ваш код на самом деле работает, иногда .

Это потому, что очередь не мгновенно не пуста. Реализация немного больше задействована для поддержки связи между несколькими процессами, поэтому задействованы потоки и каналы, которые заставляют состояние empty длиться немного дольше, чем позволяет ваш код.

См. Примечание в Трубы и очереди раздел :

Когда объект помещается в очередь, объект обрабатывается и фоновый поток позже сбрасывает обработанные данные в нижележащий канал. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать каких-либо практических трудностей - если они действительно вас беспокоят, вы можете вместо этого использовать очередь, созданную с помощью менеджера.

  1. После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод empty() очереди вернет False [...]

(жирный акцент мой)

Если вы добавляете цикл для проверки пустоты сначала , тогда ваш код работает:

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while queue.empty():
    print 'queue is still empty'

while not queue.empty():
    print queue.get()

Когда вы запускаете вышеописанное, большую часть времени 'queue is still empty' появляется один раз. Иногда он вообще не появляется, а иногда печатается дважды.

...