Я пытаюсь распечатать идентификаторы в этом списке, у которых задержка между началом и концом процесса, и задержка между queue.get (которую я реализую, используя threading.Timer с общей блокировкой ). Проблема, с которой я сталкиваюсь, состоит в том, что, хотя моя текущая настройка наличия таймера позволяет мне блокировать процессы, так что после того, как один процесс получает запись из очереди, которую не могут запустить все другие процессы, уходит 2 секунды, моя программа закрывается только 2 из 4 процессов в конце запуска программы. Как я могу это исправить, чтобы все процессы закрылись и программа могла выйти.
Мой вывод ниже показывает это, так как я хочу, чтобы было еще 2 «рабочих закрытых» уведомления:
Process started
Process started
Process started
Process started
begin 1 : 1560891818.0307562
begin 2 : 1560891820.0343137
begin 3 : 1560891822.0381632
end 2 : 3.0021514892578125
end 1 : 6.004615068435669
begin 4 : 1560891824.0439706
begin 5 : 1560891826.0481522
end 4 : 3.004107713699341
end 3 : 6.005637168884277
begin 6 : 1560891828.0511773
begin 7 : 1560891830.0557532
end 6 : 3.0032966136932373
end 5 : 6.006829261779785
begin 8 : 1560891832.056265
begin 9 : 1560891834.0593572
end 8 : 3.011284112930298
end 7 : 6.005618333816528
begin 10 : 1560891836.0627353
end 10 : 3.0014095306396484
worker closed
end 9 : 6.000675916671753
worker closed
import multiprocessing
from time import sleep, time
import threading
class TEMP:
lock = multiprocessing.Lock()
id_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
queue = multiprocessing.Queue(10)
DELAY = 2
def mp_worker(self, queue, lock):
while queue.qsize() > 0:
lock.acquire()
# Release the lock after a delay
threading.Timer(self.DELAY,lock.release).start()
record = queue.get()
start_time = time()
print("begin {0} : {1}".format(record, start_time))
if (record % 2 == 0):
sleep(3)
else:
sleep(6)
print("end {0} : {1}".format(record, time() - start_time))
threading.Timer.join()
print("worker closed")
def mp_handler(self):
# Spawn two processes, assigning the method to be executed
# and the input arguments (the queue)
processes = [multiprocessing.Process(target=self.mp_worker, args=([self.queue, self.lock])) \
for _ in range(4)]
for process in processes:
process.start()
print('Process started')
for process in processes:
process.join()
def start_mp(self):
for id in self.id_list:
self.queue.put(id)
self.mp_handler()
if __name__ == '__main__':
temp = TEMP()
temp.start_mp()