Я думаю, причина того, что ваш код зависает, состоит в том, что все рабочие задачи в конечном итоге ожидают, что что-то появится во входной очереди со строкой item = Qin.get()
одновременно, потому что get()
"блокирует", ожидая, что что-то будет помещенов очереди.Один из способов избежать этого - использовать вместо этого неблокирующий метод get_nowait()
.Для этого требуется, чтобы код обрабатывал любое исключение Empty
, которое он может вызвать, но при этом не требуется, чтобы какое-либо дальнейшее выполнение в этом процессе было эффективно остановлено на этом этапе.
Также, чтобы все работало, вам необходимо создать и использоватьmultiprocessing.Manager
, который создает серверный процесс, который содержит объекты Python и позволяет другим процессам манипулировать ими через прокси.См. Часть «Серверный процесс» в разделе Состояние общего доступа между процессами в документации.
Кроме того, при использовании multiprocessing
в Windows очень важноУбедитесь, что код основного процесса выполняется условно, поместив его в оператор if __name__ == '__main__':
.Это из-за того, как модуль был реализован на этой платформе - в противном случае этот код будет выполняться снова каждый раз, когда запускается другая параллельная задача (которая включает в себя import
их редактирование).
Ниже приведен вашкод с необходимой модификацией, поэтому он использует multiprocessing.Manager
.Примечание. Я изменил имя вашей функции manager()
, чтобы избежать путаницы с multiprocessing.Manager
, который используется для создания общих объектов.
import multiprocessing
from queue import Empty as QueueEmpty
import os
import time
END_MARKER = 'end'
def worker(id, Qin, Qout, event):
while True:
try:
item = Qin.get_nowait() # Non-blocking.
except QueueEmpty:
if event.is_set(): # Last item seen?
break
continue # Keep polling.
if item == END_MARKER: # Last item?
event.set()
break # Quit.
Qout.put('{} via worker {}'.format(item, id))
time.sleep(.25)
def pool_manager():
processes = os.cpu_count()
pool = multiprocessing.Pool(processes=processes)
manager = multiprocessing.Manager()
Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()
for i in range(100):
Qin.put(i)
Qin.put(END_MARKER)
for id in range(processes):
pool.apply_async(worker, (id, Qin, Qout, event))
pool.close() # Done adding tasks.
pool.join() # Wait for all tasks to complete.
return Qout
if __name__ == '__main__':
print('Processing')
Qout = pool_manager()
print('Contents of Qout:')
while not Qout.empty():
item = Qout.get()
print(' ', item)
print('End of script')