В Python, как узнать, когда процесс завершен? - PullRequest
23 голосов
/ 14 февраля 2011

Из Python GUI (PyGTK) я запускаю процесс (используя многопроцессорность).Процесс занимает много времени (~ 20 минут), чтобы закончить.Когда процесс закончится, я бы хотел его очистить (извлечь результаты и присоединиться к процессу).Как я знаю, когда процесс завершился?

Мой коллега предложил занятый цикл в родительском процессе, который проверяет, завершен ли дочерний процесс.Конечно, есть лучший способ.

В Unix, когда процесс разветвляется, обработчик сигнала вызывается из родительского процесса, когда дочерний процесс завершил .Но я не вижу ничего подобного в Python.Я что-то упускаю?

Как получается, что конец дочернего процесса можно наблюдать из родительского процесса?(Конечно, я не хочу вызывать Process.join (), так как это приведет к зависанию интерфейса GUI.)

Этот вопрос не ограничивается мультипроцессором: у меня точно такая же проблема с мультипроцессором.резьб.

Ответы [ 5 ]

11 голосов
/ 15 февраля 2011

Я думаю, что в рамках создания многоплатформенного Python простые вещи, такие как SIGCHLD, должны выполняться сами. Согласитесь, это немного больше работы, когда все, что вы хотите сделать, это знать, когда ребенок закончил, но на самом деле это НЕ ТАК больно. Рассмотрим следующее, в котором для выполнения работы используется дочерний процесс, два экземпляра multiprocessing.Event и поток для проверки выполнения дочернего процесса:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

EDIT

Присоединение ко всем созданным процессам и потокам - это хорошая практика, поскольку она помогает указать, когда создаются процессы / потоки, не завершающиеся зомби. Я изменил приведенный выше код, сделав класс ChildChecker, который наследует от threading.Thread. Его единственная цель - запустить задание в отдельном процессе, дождаться его завершения и затем уведомить графический интерфейс, когда все будет завершено. Присоединение к ChildChecker также присоединится к процессу, который он «проверяет». Теперь, если процесс не включается через 5 секунд, поток принудительно завершит процесс. Ввод "y" создает запускает дочерний процесс, выполняющий "endlessChildsPlay", который должен продемонстрировать принудительное завершение.

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"
2 голосов
/ 17 февраля 2011

Этот ответ действительно прост!(Это заняло у меня дней .)

В сочетании с idle_add () PyGTK вы можете создать AutoJoiningThread.Общий код границы является тривиальным:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

Если вы хотите сделать больше, чем просто объединение (например, сбор результатов), тогда вы можете расширить вышеуказанный класс для выдачи сигналов по завершении, как это сделано в следующемпример:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Вывод приведенного выше примера будет зависеть от порядка выполнения потоков, но он будет похож на:

Creating child
Creating thread
Starting thread
Child starting to play.
 Child playing.
Running mainloop (Ctrl+C to exit)
Child playing.
Child playing.
Child finished playing.
Called Thread.join()
The result was 42
^CReceived KeyboardInterrupt.  Quiting.

Невозможно создать процесс AutoJoiningProcessтаким же образом (потому что мы не можем вызвать idle_add () для двух разных процессов), однако мы можем использовать AutoJoiningThread, чтобы получить то, что нам нужно:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

Для демонстрации AutoJoiningProcess здесь приведен еще один пример:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Результирующий вывод будет очень похож на приведенный выше пример, за исключением того, что на этот раз у нас есть как присоединение к процессу, так и присоединение к нему сопутствующего потока:

Creating child
Creating thread
Starting thread
Running mainloop (Ctrl+C to exit)
 Child starting to play.
Child playing.
Child playing.
Child playing.
Child finished playing.
Called Process.join()
The result was [42]
Called Thread.join()
^CReceived KeyboardInterrupt.  Quiting.

К сожалению:

  1. Это решение зависит от gobject из-за использования idle_add ().gobject используется PyGTK.
  2. Это не настоящие отношения родитель / потомок.Если один из этих потоков запущен другим потоком, он, тем не менее, будет присоединен к потоку, выполняющему основной цикл, а не к родительскому потоку.Эта проблема сохраняется и для AutoJoiningProcess, за исключением того, что, как мне кажется, будет выдано исключение.

Таким образом, для использования этого подхода лучше всего создавать потоки / процессы только из mainloop / GUI.

2 голосов
/ 15 февраля 2011

Пытаясь найти ответ на свой вопрос, я наткнулся на функцию idle_add () PyGTK . Это дает мне следующую возможность:

  1. Создайте новый дочерний процесс, который взаимодействует через очередь.
  2. Создайте поток слушателя, который слушает Очередь, когда дочерний процесс отправляет слушателю сообщение о том, что он завершен, слушатель вызывает idle_add (), который устанавливает обратный вызов.
  3. Во время следующего цикла основного цикла родительский процесс будет вызывать обратный вызов.
  4. Обратный вызов может извлекать результаты, присоединяться к дочернему процессу и присоединяться к потоку слушателя.

Это кажется слишком сложным способом воссоздания Unix-функции call-callback-when-child-process-done.

Это должно быть распространенной проблемой с GUI в Python. Наверняка есть стандартный шаблон для решения этой проблемы?

2 голосов
/ 14 февраля 2011

Вы можете использовать очередь для связи с дочерними процессами. Вы можете прикрепить промежуточные результаты к нему, или сообщения, указывающие, что достигнуты вехи (для индикаторов выполнения), или просто сообщение, указывающее, что процесс готов к присоединению. Опрос с пусто легко и быстро.

Если вы действительно хотите узнать, сделано ли это, вы можете посмотреть код выхода вашего процесса или опроса is_alive () .

0 голосов
/ 14 февраля 2011

посмотрите на модуль подпроцесса:

http://docs.python.org/library/subprocess.html

import subprocess
let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
allText = pipe.stdout.read()
pipe.wait()
retVal = pipe.returncode
...