Как ограничить время выполнения вызова функции в Python - PullRequest
63 голосов
/ 14 декабря 2008

В моем коде есть вызов функции, связанной с сокетом, эта функция из другого модуля, таким образом, вне моего контроля, проблема в том, что она время от времени блокируется на несколько часов, что совершенно неприемлемо. Как я могу ограничить время выполнения функции с мой код? Я думаю, что решение должно использовать другой поток.

Ответы [ 9 ]

75 голосов
/ 02 марта 2009

Улучшение ответа @ rik.the.vik будет заключаться в использовании оператора with для придания функции времени ожидания некоторого синтаксического сахара:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")
36 голосов
/ 14 декабря 2008

Я не уверен, насколько кроссплатформенным это может быть, но использование сигналов и сигналов тревоги может быть хорошим способом посмотреть на это. Немного поработав, вы можете сделать это полностью универсальным и пригодным для использования в любой ситуации.

http://docs.python.org/library/signal.html

Итак, ваш код будет выглядеть примерно так.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"
17 голосов
/ 31 октября 2014

Вот способ Linux / OSX ограничить время работы функции. Это в том случае, если вы не хотите использовать потоки и хотите, чтобы ваша программа ожидала завершения функции или истечения срока.

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False
12 голосов
/ 06 июня 2016

Я предпочитаю подход с использованием диспетчера контекста, поскольку он позволяет выполнять несколько операторов Python внутри оператора with time_limit. Поскольку система Windows не имеет SIGALARM, более портативный и, возможно, более простой метод может использовать Timer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

Основным приемом здесь является использование _thread.interrupt_main для прерывания основного потока из потока таймера. Одно предупреждение: основной поток не всегда быстро реагирует на KeyboardInterrupt, вызванный Timer. Например, time.sleep() вызывает системную функцию, поэтому KeyboardInterrupt будет обработан после вызова sleep.

7 голосов
/ 12 июля 2009

Делать это из обработчика сигнала опасно: вы можете находиться внутри обработчика исключений в момент возникновения исключения и оставлять вещи в неисправном состоянии. Например,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

Если здесь возникло исключение (), временный файл никогда не будет удален.

Решение здесь заключается в том, чтобы асинхронные исключения откладывались до тех пор, пока код не окажется внутри кода обработки исключений (исключение или окончание блока), но Python этого не делает.

Обратите внимание, что это ничего не помешает при выполнении нативного кода; он прервет его только когда функция вернется, так что это может не помочь в этом конкретном случае. (Сам SIGALRM может прервать блокирующий вызов, но код сокета обычно просто повторяется после EINTR.)

Лучше сделать это с помощью потоков, поскольку он более переносим, ​​чем сигналы. Поскольку вы запускаете рабочий поток и блокируете его до конца, нет никаких обычных проблем с параллелизмом. К сожалению, нет способа доставить исключение асинхронно другому потоку в Python (другие API потока могут сделать это). У него также будет та же проблема с отправкой исключения во время обработчика исключения, и потребуется такое же исправление.

5 голосов
/ 14 декабря 2008

Вам не нужно использовать темы. Вы можете использовать другой процесс для выполнения работы по блокировке, например, используя модуль subprocess . Если вы хотите обмениваться структурами данных между различными частями вашей программы, тогда Twisted - это отличная библиотека для того, чтобы вы могли сами контролировать это, и я рекомендую ее, если вы хотите заблокировать и ожидаете, что эта проблема много. Плохая новость с Twisted заключается в том, что вы должны переписать свой код, чтобы избежать каких-либо блокировок, и есть хорошая кривая обучения.

Вы можете использовать потоки, чтобы избежать блокировки, но я бы расценил это как последнее средство, так как это подвергает вас целому миру боли. Прочитайте хорошую книгу о параллелизме, прежде чем даже подумать об использовании потоков в производстве, например, Жан Бэкон "Параллельные системы". Я работаю с группой людей, которые действительно крутят высокопроизводительные вещи с помощью потоков, и мы не внедряем потоки в проекты, если они нам действительно не нужны.

4 голосов
/ 14 декабря 2008

Единственный «безопасный» способ сделать это на любом языке - это использовать вторичный процесс для выполнения этого тайм-аута, в противном случае вам нужно построить свой код таким образом, чтобы он сам по себе был безопасным, например, проверяя время, прошедшее в цикле или тому подобное. Если изменение метода не является вариантом, потока будет недостаточно.

Почему? Потому что вы рискуете оставить вещи в плохом состоянии, когда вы это делаете. Если поток просто прерван в середине метода, удерживаемые блокировки и т. Д. Будут просто удерживаться и не могут быть освобождены.

Итак, посмотрите на путь процесса, не посмотрите на путь потока.

1 голос
/ 27 января 2016

Я обычно предпочитаю использовать контекстный менеджер, как предложено @ josh-lee

Но если кто-то заинтересован в том, чтобы это было реализовано в качестве декоратора, вот альтернатива.

Вот как это будет выглядеть:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

А это модуль timeout.py:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

Выход:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

Обратите внимание, что даже если выдается TimeoutError, декорированный метод будет продолжать выполняться в другом потоке. Если вы также хотите, чтобы этот поток был «остановлен», смотрите: Есть ли способ убить поток в Python?

1 голос
/ 15 декабря 2008

Вот функция тайм-аута, я думаю, что я нашел через Google, и она работает для меня.

Из: http://code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...