Каков наилучший способ управления потоком дочерних процессов в Python? - PullRequest
1 голос
/ 01 апреля 2020

Я пытаюсь запустить, приостановить и завершить дочерние процессы в Python из родительского процесса. Я пытался использовать multiprocessing.Value, но по какой-то причине родительский процесс никогда не завершается полностью, хотя я terminate и join все процессы. Мой вариант использования выглядит примерно так:

def child(flow_flag):
   while True:
      with flow_flag.get_lock():
          flag_value = flow_flag.value
      if flag_value == 0:
         print("This is doing some work")
      elif flag_value == 1:
         print("This is waiting for some time to check back later")
         time.sleep(5)
      else:
         print("Time to exit")
         break

def main():
    flow_flag = Value('i', 0)
    processes  = [Process(target=child, args=(flow_flag,)) for i in range(10)]
    [p.start() for p in processes]    
    print("Waiting for some work")
    with flow_flag.get_lock():    
        flow_flag.value = 1
    print("Do something else")
    with flow_flag.get_lock():    
        flow_flag.value = 0
    print("Waiting for more work")
    with flow_flag.get_lock():    
        flow_flag.value = 2
    print("Exiting")
    for p in processes:
        p.terminate()
        p.join()

Это никогда не заканчивается должным образом, и мне в конечном итоге приходится Ctrl+C. Тогда я вижу это сообщение:

Traceback (most recent call last):
  File "/home/abcde/anaconda3/lib/python3.7/threading.py", line 1308, in _shutdown
    lock.acquire()
KeyboardInterrupt

Что лучше? К вашему сведению, ожидая чего-то другого, я порождаю некоторые другие процессы. У меня также было, чтобы они не заканчивались должным образом, и я использовал с ними Value. Это было исправлено, когда я переключился на использование Queue для них. Тем не менее, Queue не подходит для описанного выше случая.

PS: я sh в Ubuntu 18.04.

РЕДАКТИРОВАТЬ: после большой отладки, не выход из программы произошел из-за библиотеки, которую я использую, которую я не подозревал, чтобы вызвать это. Мои извинения за ложную тревогу. Спасибо за предложения о том, как лучше контролировать дочерние процессы.

Ответы [ 2 ]

0 голосов
/ 02 апреля 2020

Ваша программа работает для меня, но позвольте мне вмешаться в вопрос "есть ли другой путь". Вместо опроса с 5-секундными интервалами вы можете создать общий объект события, который позволяет дочерним процессам знать, когда они могут выполнять свою работу. Вместо опроса значения 1 дождитесь события.

from multiprocessing import *
import time
import os

def child(event, times_up):
    while True:
        event.wait()
        if times_up.value:
            print(os.getpid(), "time to exit")
            return
        print(os.getpid(), "doing work")
        time.sleep(.5)

def main():
    manager = Manager()
    event = manager.Event()
    times_up = manager.Value(bool, False)
    processes  = [Process(target=child, args=(event, times_up)) for i in range(10)]
    [p.start() for p in processes]
    print("Let processes work")
    event.set()
    time.sleep(2)
    print("Make them stop")
    event.clear()
    time.sleep(4)
    print("Make them go away")
    times_up.value = True
    event.set()
    print("Exiting")
    for p in processes:
        p.join()

if __name__ == "__main__":
    main()
0 голосов
/ 02 апреля 2020

При Python 3.7.7, работающем во FreeBSD 12.1 (64-битная версия), я не могу воспроизвести вашу проблему.

После исправления отступа и добавления необходимых импортов Измененная программа работает нормально AFAICT.

Кстати, вы можете import sys и добавить

sys.stdout.reconfigure(line_buffering=True)

в начале вашего main().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...