Python: разделение временной блокировки между порожденными процессами, чтобы между ними была задержка - PullRequest
0 голосов
/ 19 июня 2019

Я пытаюсь распечатать идентификаторы в этом списке, у которых задержка между началом и концом процесса, и задержка между queue.get (которую я реализую, используя threading.Timer с общей блокировкой ). Проблема, с которой я сталкиваюсь, состоит в том, что, хотя моя текущая настройка наличия таймера позволяет мне блокировать процессы, так что после того, как один процесс получает запись из очереди, которую не могут запустить все другие процессы, уходит 2 секунды, моя программа закрывается только 2 из 4 процессов в конце запуска программы. Как я могу это исправить, чтобы все процессы закрылись и программа могла выйти.

Мой вывод ниже показывает это, так как я хочу, чтобы было еще 2 «рабочих закрытых» уведомления:

Process started
Process started
Process started
Process started
begin 1 : 1560891818.0307562
begin 2 : 1560891820.0343137
begin 3 : 1560891822.0381632
end 2 : 3.0021514892578125
end 1 : 6.004615068435669
begin 4 : 1560891824.0439706
begin 5 : 1560891826.0481522
end 4 : 3.004107713699341
end 3 : 6.005637168884277
begin 6 : 1560891828.0511773
begin 7 : 1560891830.0557532
end 6 : 3.0032966136932373
end 5 : 6.006829261779785
begin 8 : 1560891832.056265
begin 9 : 1560891834.0593572
end 8 : 3.011284112930298
end 7 : 6.005618333816528
begin 10 : 1560891836.0627353
end 10 : 3.0014095306396484
worker closed
end 9 : 6.000675916671753
worker closed
import multiprocessing
from time import sleep, time
import threading

class TEMP:

    lock = multiprocessing.Lock()

    id_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    queue = multiprocessing.Queue(10)

    DELAY = 2

    def mp_worker(self, queue, lock):

        while queue.qsize() > 0:

            lock.acquire()
            # Release the lock after a delay
            threading.Timer(self.DELAY,lock.release).start()

            record = queue.get()
            start_time = time()
            print("begin {0} : {1}".format(record, start_time))
            if (record % 2 == 0):
                sleep(3)
            else:
                sleep(6)
            print("end {0} : {1}".format(record, time() - start_time))

            threading.Timer.join()

        print("worker closed")

    def mp_handler(self):

        # Spawn two processes, assigning the method to be executed
        # and the input arguments (the queue)
        processes = [multiprocessing.Process(target=self.mp_worker, args=([self.queue, self.lock])) \
            for _ in range(4)]

        for process in processes:
            process.start()
            print('Process started')


        for process in processes:
            process.join()

    def start_mp(self):

        for id in self.id_list:
            self.queue.put(id)

        self.mp_handler()

if __name__ == '__main__':
    temp = TEMP()
    temp.start_mp()

1 Ответ

0 голосов
/ 15 июля 2019

Я на самом деле исправил эту проблему. Основная причина, по которой мой код не был присоединен, заключалась в том, что мой код проверял, пуста ли очередь, ожидал задержки, а затем пытался получить что-то из очереди. Это означало, что к концу программы, когда очередь стала пустой и 2 из 4 процессов были успешно завершены одновременно, оставшиеся 2 процесса были задержаны. Когда эта задержка закончилась, они попытались получить что-то из очереди, но поскольку очередь была пуста, они просто заблокировали выполнение оставшейся части кода процесса, что означало, что они никогда не смогут присоединиться к резервной копии.

Я исправил это, также проверив, пуста ли очередь прямо перед тем, как процесс попытается получить что-то из очереди. Моя фиксированная рабочая функция ниже:

def mp_worker(self, queue, lock):

    while not queue.empty():

        print(mp.current_process().name)
        lock.acquire()
        # Release the lock after a delay
        timer = Timer(self.DELAY, lock.release)
        timer.start()

        if not queue.empty():
            record = queue.get(False)

            start_time = time()
            print("begin {0} : {1}".format(record, start_time))
            if (record % 2 == 0):
                sleep(3)
            else:
                sleep(6)
            print("end {0} : {1}".format(record, time() - start_time))

    print("{0} closed".format(mp.current_process().name))
...