Первоначально я удалил этот ответ после того, как прочитал @Martijn Pieters ', так как он описал «почему это не работает» более подробно и ранее. затем
Я понял, что сценарий использования в примере OP не совсем соответствует канонически звучащему названию
«Как использовать метод multiprocessing.Queue.get».
Это не потому, что есть
для демонстрации не задействован ни один дочерний процесс, а потому, что в реальных приложениях очередь почти не заполняется заранее и только считывается после, но при чтении
и запись происходит чередованием со временем ожидания между ними. Расширенный демонстрационный код, который продемонстрировал Мартейн, не будет работать в обычных сценариях, потому что цикл while прервется слишком рано, когда постановка в очередь не успевает за чтением. Итак, вот перезагруженный ответ, который может справиться с обычными сценариями чередования каналов и чтений:
Не полагайтесь на queue.empty проверяет синхронизацию.
После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод empty () в очереди вернет False и get_nowait () сможет вернуться без вызова queue.Empty.
...
пусто ()
Вернуть True, если очередь пуста, иначе False. Из-за многопоточной / многопроцессорной семантики это ненадежно. 1020 * Docs *
Либо используйте for msg in iter(queue.get, sentinel):
до .get()
из очереди, где вы выходите из цикла, передавая значение часового ... iter (вызываемое, страж)?
from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)
... или используйте get_nowait()
и обработайте возможное исключение queue.Empty
, если вам нужно неблокирующее решение.
from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)
В случае, если только один процесс и только один поток в этом процессе читает очередь, было бы также возможно обменять последний фрагмент кода с:
while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)
Поскольку поток может отбросить GIL между проверками if not queue.empty()
и queue.get()
, это не подходит для многопоточного чтения очереди в процессе. То же самое относится, если несколько процессов читают из очереди.
Для сценариев с одним производителем / одним потребителем будет достаточно multiprocessing.Pipe
вместо multiprocessing.Queue
и более производительным.