Чтобы ускорить выполнение определенной задачи, я создаю подкласс Process
, чтобы создать работника, который будет обрабатывать данных, поступающих в выборках.Некоторые управляющие классы будут передавать данные и читать выходные данные (используя два экземпляра Queue
).Для асинхронной работы я использую put_nowait
и get_nowait
.В конце я отправляю специальный код завершения моему процессу, после чего он прерывает свой внутренний цикл.Однако ... этого никогда не происходит.Вот минимальный воспроизводимый пример:
import multiprocessing as mp
class Worker(mp.Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.input_queue = in_queue
self.output_queue = out_queue
def run(self):
while True:
received = self.input_queue.get(block=True)
if received is None:
break
self.output_queue.put_nowait(received)
print("\tWORKER DEAD")
class Processor():
def __init__(self):
# prepare
in_queue = mp.Queue()
out_queue = mp.Queue()
worker = Worker(in_queue, out_queue)
# get to work
worker.start()
in_queue.put_nowait(list(range(10**5))) # XXX
# clean up
print("NOTIFYING")
in_queue.put_nowait(None)
#out_queue.get() # XXX
print("JOINING")
worker.join()
Processor()
Этот код никогда не завершается, постоянно висит так:
NOTIFYING
JOINING
WORKER DEAD
Почему?
I 'мы пометили две строки XXX
.В первом случае, если я отправлю меньше данных (скажем, 10**4
), все закончится нормально (процессы присоединяются, как и ожидалось).Точно так же во втором, если я get()
после уведомления рабочих, чтобы закончить.Я знаю, что что-то упустил, но ничего в документации не кажется уместным.