Я использовал многопроцессорную обработку Python обычным способом, отправив ядовитую таблетку или sentinel , чтобы указать, что очередь должна закончиться.
Теперь я пытаюсьиспользовать JoinableQueue.join
& JoinableQueue.task_done
методы, и я не могу понять, как его использовать. Во всех примерах, которые я погуглил, используется подход «отравленная таблетка», даже с JoinableQueue
.
Как правильно написать?
from multiprocessing import Process, JoinableQueue
from multiprocessing.queues import Empty
from time import sleep
def file_reader(q, filename):
# pretend it reads a file with different speed
sleep(1)
q.put('A')
sleep(1)
q.put('B')
sleep(2)
q.put('C')
def data_processor(q1, q2):
# this one processes the data rather quickly
while True:
data = q1.get()
q2.put(f'> {data} <')
q1.task_done()
q1 = JoinableQueue()
q2 = JoinableQueue()
p1 = Process(target=file_reader, args=(q1, 'test'))
p2 = Process(target=data_processor, args=(q1, q2))
p1.start()
p2.start()
while True:
try:
data1 = q2.get(timeout=3)
print(data1)
q2.task_done()
except Empty:
print('empty!')
break
p1.join()
print('p1 joined')
p2.join()
print('p2 joined')
Этот код печатает:
> A <
> B <
> C <
empty!
p1 joined
p2 фактически никогда не заканчивается.