Подождите не более N секунд, пока дочерний PID не завершится - PullRequest
2 голосов
/ 29 марта 2012

Я пытаюсь использовать os.fork, os.waitpid и многопоточность для передачи трудоемкой работы на дочерний процесс, подождите максимум определенное количество времени для его завершения,и затем продолжайте, оставляя его работать в фоновом режиме.У дочернего элемента также есть вторичный таймер, который не позволяет ему слишком долго работать в фоновом режиме, например:

Fork execution
In the fork's child:
    Start a thread to execute a task
        If that thread runs for longer than X milliseconds:
             thread.stop(), a custom method that softly stops the thread.
In the fork's parent:
    If the child's PID stays running for longer than Y milliseconds:
        return -1
    else:
        return 0

После возврата значения я хочу, чтобы скрипт завершился.Ребенок может и должен продолжать бегать, если это не сделано.

Код, который я пробовал (сокращенно):

class MyCustomThread(threading.Thread):
    abort = False
    def run(self):
        counter = 0
        while True:
            if self.abort: break
            counter += 1
            if counter == 30: self.stop()
            sleep(1)
        return 0

    def stop(self):
        self.abort = True

def test():
    X = 2000
    Y = 500
    pid = os.fork()
    if pid == 0:
        thread1 = MyCustomThread() #Sleeps for 30 seconds and ends.
        thread1.start()
        print "Started 1!"
        timeout = X # say, 1000ms
        while timeout > 0:
            if not thread1.is_alive(): return "custom thread finished before the deadline!"
            timeout -= 1
            sleep(0.001)
        if thread1.is_alive():
            return "custom thread didn't finish before the deadline!"
            thread1.stop()
        exit()
    else:
        print pid
        thread2 = Thread(target = os.waitpid, args = (pid, 0))
        thread2.start()
        print "Started 2!"
        timeout2 = Y # say, 500ms
        while timeout2 > 0:
            if not thread2.is_alive(): return "child PID finished!"
            timeout2 -= 1
            sleep(0.001)
        if thread2.is_alive():
            return "child PID didn't finish yet!"
print test()
print "all done!"

Вывод правильный, в этом я получаю

1171
Started 2!
Started 1!
child PID didn't finish yet!
all done!
custom thread didn't finish before the deadline!
all done!

Но тогда скрипт не выйдет!Он спит в течение оставшихся 28 секунд, прежде чем

Как завершить основное выполнение этого скрипта после того, как родитель fork вернет значение? Как я уже говорил, ребенок можети должен продолжать работать в фоновом режиме, он просто не должен блокировать выполнение следующей задачи на родительском.

Мне действительно все равно, сможет ли ребенок распечатать вывод на стандартный вывод - в реальной реализации все, что он делает, - это общается с базой данных, поэтому ему не нужно ничего сообщать в интерактивном режиме.Родитель, однако, должен отправить дочернему элементу, подождать самое большее Y секунд, чтобы дочерний процесс завершил работу, а затем (насколько это касается того, кто вызывал скрипт), завершить выполнение скрипта, чтобы следующая вещьосуществимо.Я думаю, что другой таймер (X) не имеет значения;он существует только для того, чтобы ребенок не бегал слишком долго в фоновом режиме.

Есть идеи?Я, вероятно, подхожу к этому совершенно неверно, так что "начните сначала и делайте это _ way" идеи приветствуются.

Ответы [ 4 ]

2 голосов
/ 29 марта 2012

Попробуйте это, оно не использует многопоточность, просто чистый fork / waitpid / alarm / sighandler:

child_exited = False

def sigh(signum, frame):
    global child_exited
    if signum == signal.SIGALRM:
        print "custom thread didn't finish before the deadline!"
        #forced exit:
        exit(1)     

    if signum == signal.SIGCHLD:
        (pid, status) = os.waitpid(-1, 0) 
        print "child exited with status: " + str(os.WEXITSTATUS(status))
        child_exited = True

def test():
    global child_exited
    pid = os.fork()
    if pid == 0:
        signal.signal(signal.SIGALRM, sigh)
        signal.alarm(30)

        #do work:
        print "Started 1"
        time.sleep(60)

        #clean exit:
        exit(0)
    elif (pid > 0):
        signal.signal(signal.SIGCHLD, sigh)
        print "Started 2"
        #this sleep will return prematurely if the child exits
        time.sleep(10)

        if not child_exited:
          print "child PID didn't finish yet!"
    else:
        print "fork() failed"

print test()
print "all done!"
1 голос
/ 29 марта 2012

Вы можете использовать join метод Thread

thread2.join(timeout=Y)
if not thread2.is_alive():
   # TODO: Post Processing
1 голос
/ 29 марта 2012

Это не точный ответ на ваш вопрос, а скорее идея "начни сначала и делай это _путь".

Вы можете использовать модуль multiprocessing.Функция Pool.apply_async() позволяет выполнять функцию в фоновом режиме, а возвращенные AsyncResult признаки объекта wait() и get() методов с параметром timeout.

Ваш код по существу станет (не проверенным)

import multiprocessing

def computationally_intensive():
    # whatever

p = multiprocessing.Pool(1)
deferred = p.apply_async(computationally_intensive)
try:
    result = deferred.get(10)              # 10 being the timeout
except multiprocessing.TimeoutError:
    # Handle time out
# ...
p.terminate()
0 голосов
/ 29 марта 2012

Разобрался, спасибо за все указатели! Редактировать: исправлена ​​is_pid_alive функция для работы, если вызывается также изнутри ребенка.

Проблема заключалась в том, что родительский поток "наблюдатель" никогда не завершал работу, поскольку os.waitpid не является функцией, пригодной для цикла или цикла. Решение состояло в том, чтобы удалить поток «наблюдатель» и вместо этого реализовать цикл опроса, который проверяет функцию pid_is_alive() каждую миллисекунду, например:

def pid_is_alive(pid):
    try:
        os.kill(pid, 0)
        os.waitpid(pid, os.WNOHANG)
        os.kill(pid, 0)
    except OSError:
        return False
    return True


def test():
    X = 1000 * 1000
    Y = 5000
    pid = os.fork()
    if pid == 0:
        thread1 =  MyCustomThread() #Sleeps for 30 seconds and ends.
        thread1.start()
        print "Started 1!"
        timeout = X # say, 1000ms
        while timeout > 0:
            if not thread1.is_alive(): return "custom thread finished before the deadline!"
            timeout -= 1
            sleep(0.001)
        if thread1.is_alive():
            return "custom thread didn't finish before the deadline!"
            thread1.stop()
        exit()

    else:
        timeout2 = Y # say, 500ms
        while timeout2 > 0:
            if not pid_is_alive(pid): return "child PID finished!"
            timeout2 -= 1
            sleep(0.001)
        if pid_is_alive(pid):
            print "child PID didn't finish yet!"
            exit()
print test()
print "all done!"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...