Тайм-аут программы в Windows для Python - PullRequest
4 голосов
/ 22 августа 2011

Привет, я видел несколько вопросов здесь на SE, которые касаются подобных проблем, но я нахожу, что многие ответы неясны и сбивают с толку. Мой вопрос очень прост. Как на платформе Windows я могу убить работающую функцию через определенное время, используя Python v2.6? Если у меня есть:

def my_function(start):
    x=start
    while True:
        print x
        x=x+1
    return x

как я могу сделать эту остановку через X секунд? Пожалуйста, держите ваши ответы в чистоте о том, где поставить мою функцию и как настроить ограничение по времени. Спасибо

Ответы [ 3 ]

4 голосов
/ 22 августа 2011

Если вы просто хотите запустить обычную функцию с тайм-аутом, попробуйте:

from datetime import timedelta, datetime
from time import sleep

endtime = datetime.utcnow() + timedelta(seconds = 2)

while True:
    sleep(1) # just an example
    if datetime.utcnow() > endtime: # if more than two seconds has elapsed
        break

Если вы говорите об остановке цепочки, есть запись в блоге о том, как сделать это с помощью цепочек, которая охватывает всеThe Base.

Как (не) установить время ожидания для вычисления в Python . автор также использует этот сайт.

По сути, ответ заключается в том, что в Python нет "правильного способа" сделать это, хотя, если вы не работаете в Windows, SIGALARM работает.

1 голос
/ 22 августа 2011

Вы можете использовать этот модуль timeout.py в своих интересах. Его использование требует, чтобы функция была написана и пропущена (или украшена) функцией add_timeout в модуле.

import timeout

def my_function(start):
    x=start
    while True:
        print x
        x=x+1
    return x

my_function = timeout.add_timeout(my_function, seconds)

После вызова новой «функции» вы можете запросить свойство ready, чтобы выяснить, в каком состоянии он находится:

  • False означает, что функция все еще выполняется.
  • True означает, что выполнение завершено.
  • None означает, что тайм-аут достигнут и функция прекращена.

Если ready равно True, свойство value будет иметь результаты выполнения функций. Если функция вызвала исключение, она будет перезапущена после оценки value. Если value вычислено до того, как ready равно True, timeout.NotReadyError будет повышено.

Примечание: модуль не был разработан специально для декорирования функций, и методы в целом не должны декорироваться . Атрибуты экземпляра не гарантируются для обновления через границу выполнения.


Добавление:

Похоже, что Google Code больше не поддерживает поиск модуля timeout.py . Хотя API немного отличается, я бы порекомендовал модуль asynchronous. Его источник включен ниже, за ним следует второй модуль, используемый для проверки его правильной работы.

asynchronous.py

#! /usr/bin/env python3
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time

__all__ = (
    'Executor',
    'get_timeout',
    'set_timeout',
    'submit',
    'map_',
    'shutdown'
)


class _Base(metaclass=_abc.ABCMeta):
    __slots__ = (
        '__timeout',
    )

    @_abc.abstractmethod
    def __init__(self, timeout):
        self.timeout = _math.inf if timeout is None else timeout

    def get_timeout(self):
        return self.__timeout

    def set_timeout(self, value):
        if not isinstance(value, (float, int)):
            raise TypeError('value must be of type float or int')
        if value <= 0:
            raise ValueError('value must be greater than zero')
        self.__timeout = value

    timeout = property(get_timeout, set_timeout)


def _run_and_catch(fn, args, kwargs):
    # noinspection PyPep8,PyBroadException
    try:
        return False, fn(*args, **kwargs)
    except:
        return True, _sys.exc_info()[1]


def _run(fn, args, kwargs, queue):
    queue.put_nowait(_run_and_catch(fn, args, kwargs))


class _State(_enum.IntEnum):
    PENDING = _enum.auto()
    RUNNING = _enum.auto()
    CANCELLED = _enum.auto()
    FINISHED = _enum.auto()
    ERROR = _enum.auto()


def _run_and_catch_loop(iterable, *args, **kwargs):
    exception = None
    for fn in iterable:
        error, value = _run_and_catch(fn, args, kwargs)
        if error:
            exception = value
    if exception:
        raise exception


class _Future(_Base):
    __slots__ = (
        '__queue',
        '__process',
        '__start_time',
        '__callbacks',
        '__result'
    )

    def __init__(self, timeout, fn, args, kwargs):
        super().__init__(timeout)
        self.__queue = _multiprocessing.Queue(1)
        self.__process = _multiprocessing.Process(
            target=_run,
            args=(fn, args, kwargs, self.__queue),
            daemon=True
        )
        self.__start_time = _math.inf
        self.__callbacks = _collections.deque()
        self.__result = True, TimeoutError()

    @property
    def __state(self):
        pid, exitcode = self.__process.pid, self.__process.exitcode
        return (_State.PENDING if pid is None else
                _State.RUNNING if exitcode is None else
                _State.CANCELLED if exitcode == -_signal.SIGTERM else
                _State.FINISHED if exitcode == 0 else
                _State.ERROR)

    def __repr__(self):
        root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
        if self.__state < _State.CANCELLED:
            return f'<{root}>'
        error, value = self.__result
        suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
        return f'<{root} {suffix}>'

    def __consume_callbacks(self):
        while self.__callbacks:
            yield self.__callbacks.popleft()

    def __invoke_callbacks(self):
        self.__process.join()
        _run_and_catch_loop(self.__consume_callbacks(), self)

    def cancel(self):
        self.__process.terminate()
        self.__invoke_callbacks()

    def __auto_cancel(self):
        elapsed_time = _time.perf_counter() - self.__start_time
        if elapsed_time > self.timeout:
            self.cancel()
        return elapsed_time

    def cancelled(self):
        self.__auto_cancel()
        return self.__state is _State.CANCELLED

    def running(self):
        self.__auto_cancel()
        return self.__state is _State.RUNNING

    def done(self):
        self.__auto_cancel()
        return self.__state > _State.RUNNING

    def __handle_result(self, error, value):
        self.__result = error, value
        self.__invoke_callbacks()

    def __ensure_termination(self):
        elapsed_time = self.__auto_cancel()
        if not self.__queue.empty():
            self.__handle_result(*self.__queue.get_nowait())
        elif self.__state < _State.CANCELLED:
            remaining_time = self.timeout - elapsed_time
            if remaining_time == _math.inf:
                remaining_time = None
            try:
                result = self.__queue.get(True, remaining_time)
            except _queue.Empty:
                self.cancel()
            else:
                self.__handle_result(*result)

    def result(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            raise value
        return value

    def exception(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            return value

    def add_done_callback(self, fn):
        if self.done():
            fn(self)
        else:
            self.__callbacks.append(fn)

    def _set_running_or_notify_cancel(self):
        if self.__state is _State.PENDING:
            self.__process.start()
            self.__start_time = _time.perf_counter()
        else:
            self.cancel()


class Executor(_Base):
    __slots__ = (
        '__futures',
    )

    def __init__(self, timeout=None):
        super().__init__(timeout)
        self.__futures = set()

    def submit(self, fn, *args, **kwargs):
        future = _Future(self.timeout, fn, args, kwargs)
        self.__futures.add(future)
        future.add_done_callback(self.__futures.remove)
        # noinspection PyProtectedMember
        future._set_running_or_notify_cancel()
        return future

    @staticmethod
    def __cancel_futures(iterable):
        _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))

    def map(self, fn, *iterables):
        futures = tuple(self.submit(fn, *args) for args in zip(*iterables))

        def result_iterator():
            future_iterator = iter(futures)
            try:
                for future in future_iterator:
                    yield future.result()
            finally:
                self.__cancel_futures(future_iterator)

        return result_iterator()

    def shutdown(self):
        self.__cancel_futures(frozenset(self.__futures))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown()
        return False


_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor

test_asynchronous.py

#! /usr/bin/env python3
import _thread
import atexit
import functools
import inspect
import io
import math
import operator
import os
import sys
import time
import unittest

import asynchronous


# noinspection PyUnresolvedReferences
class TestConstructor:
    def instantiate(self, *args):
        parameters = len(inspect.signature(self.CLASS).parameters)
        return self.CLASS(*args[:parameters])

    def test_valid_timeout(self):
        instance = self.instantiate(None, print, (), {})
        self.assertEqual(instance.get_timeout(), math.inf)
        instance = self.instantiate(1, print, (), {})
        self.assertEqual(instance.get_timeout(), 1)
        float_timeout = (math.e ** (1j * math.pi) + 1).imag
        self.assertIsInstance(float_timeout, float)
        instance = self.instantiate(float_timeout, print, (), {})
        self.assertEqual(instance.get_timeout(), float_timeout)

    def test_error_timeout(self):
        self.assertRaises(TypeError, self.instantiate, '60', print, (), {})
        self.assertRaises(ValueError, self.instantiate, 0, print, (), {})
        self.assertRaises(ValueError, self.instantiate, -1, print, (), {})


# noinspection PyUnresolvedReferences
class TestTimeout(TestConstructor):
    def test_valid_property(self):
        instance = self.instantiate(None, None, None, None)
        instance.timeout = 1
        self.assertIsInstance(instance.timeout, int)
        instance.timeout = 1 / 2
        self.assertIsInstance(instance.timeout, float)
        kilo_bit = int.from_bytes(os.urandom(1 << 7), 'big')
        instance.timeout = kilo_bit
        self.assertEqual(instance.timeout, kilo_bit)

    def test_error_property(self):
        instance = self.instantiate(None, None, None, None)
        for exception, value in (
                (TypeError, 'inf'),
                (TypeError, complex(123456789, 0)),
                (ValueError, 0),
                (ValueError, 0.0),
                (ValueError, -1),
                (ValueError, -math.pi)
        ):
            with self.assertRaises(exception):
                instance.timeout = value
            self.assertEqual(instance.timeout, math.inf)


class Timer:
    __timers = {}

    @classmethod
    def start_timer(cls):
        ident, now = _thread.get_ident(), time.perf_counter()
        if now is not cls.__timers.setdefault(ident, now):
            raise KeyError(ident)

    @classmethod
    def stop_timer(cls, expected_time, error=None):
        if error is None:
            error = 1 / 4  # the default is a quarter second
        used = time.perf_counter() - cls.__timers.pop(_thread.get_ident())
        diff = used - expected_time
        return -error <= diff <= +error


# noinspection PyUnresolvedReferences
class TestTimer(Timer):
    def stop_timer(self, expected_time, error=None):
        self.assertTrue(super().stop_timer(expected_time, error))


def delay_run(delay, fn, *args, sync=True, **kwargs):
    def wrapper():
        time.sleep(delay)
        return fn(*args, **kwargs)

    if sync:
        return wrapper()
    _thread.start_new_thread(wrapper, ())


# noinspection PyUnresolvedReferences
class TestModuleOrInstance(TestTimer):
    @property
    def moi(self):
        return self.MODULE_OR_INSTANCE

    def test_valid_timeout(self):
        self.moi.set_timeout(math.inf)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.moi.set_timeout(60)
        self.assertEqual(self.moi.get_timeout(), 60)
        self.moi.set_timeout(0.05)
        self.assertEqual(self.moi.get_timeout(), 0.05)

    def test_error_timeout(self):
        self.moi.set_timeout(math.inf)
        self.assertRaises(TypeError, self.moi.set_timeout, None)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.assertRaises(ValueError, self.moi.set_timeout, 0)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.assertRaises(ValueError, self.moi.set_timeout, -1)
        self.assertEqual(self.moi.get_timeout(), math.inf)

    def run_submit_check(self):
        self.start_timer()
        future = self.moi.submit(delay_run, 0.5, operator.add, 1, 2)
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED returned int>$'
        )

    def test_submit_one_second_timeout(self):
        self.moi.set_timeout(1)
        self.run_submit_check()

    def test_submit_no_timeout(self):
        self.moi.set_timeout(math.inf)
        self.run_submit_check()

    def test_submit_short_timeout(self):
        self.moi.set_timeout(0.5)
        self.start_timer()
        future = self.moi.submit(delay_run, 1, operator.add, 1, 2)
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        self.assertIsInstance(future.exception(), TimeoutError)
        self.stop_timer(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
        )

    def run_map(self, *args):
        return getattr(self.moi, self.NAME_OF_MAP)(delay_run, *args)

    def test_valid_map(self):
        self.moi.set_timeout(1.5)
        for result in self.run_map(
                [1, 1, 1, 1],
                [operator.add] * 4,
                [0, 1, 2, 3],
                [3, 2, 1, 0]
        ):
            self.assertEqual(result, 3)

    def test_error_map(self):
        self.moi.set_timeout(1.5)
        success = 0
        with self.assertRaises(TimeoutError):
            for result in self.run_map(
                    [1, 1, 2, 1],
                    [operator.add] * 4,
                    [0, 1, 2, 3],
                    [3, 2, 1, 0]
            ):
                self.assertEqual(result, 3)
                success += 1
        self.assertEqual(success, 2)

    def run_shutdown_check(self, running, future):
        self.assertRaises(TimeoutError, future.result)
        running.remove(future)

    def run_submit_loop(self, executor):
        running = set()
        done_callback = functools.partial(self.run_shutdown_check, running)
        for _ in range(10):
            future = executor.submit(delay_run, 2, operator.add, 10, 20)
            running.add(future)
            future.add_done_callback(done_callback)
        time.sleep(0.5)
        return running

    def test_valid_shutdown(self):
        self.moi.set_timeout(1.5)
        running = self.run_submit_loop(self.moi)
        self.moi.shutdown()
        self.assertFalse(running)

    def test_error_shutdown(self):
        self.moi.set_timeout(1.5)
        running = self.run_submit_loop(self.moi)
        running.pop()
        self.assertRaises(KeyError, self.moi.shutdown)
        self.assertFalse(running)


class TestExecutorAPI(TestTimeout, TestModuleOrInstance, unittest.TestCase):
    CLASS = asynchronous.Executor
    MODULE_OR_INSTANCE = CLASS()
    NAME_OF_MAP = 'map'

    def test_valid_context_manager(self):
        with self.instantiate(1.5) as executor:
            running = self.run_submit_loop(executor)
        self.assertFalse(running)

    def test_error_context_manager(self):
        error = Exception()
        with self.assertRaises(Exception) as cm:
            with self.instantiate(1.5) as executor:
                running = self.run_submit_loop(executor)
                raise error
        self.assertIs(cm.exception, error)
        self.assertFalse(running)
        with self.assertRaises(KeyError):
            with self.instantiate(1.5) as executor:
                running = self.run_submit_loop(executor)
                running.pop()
        self.assertFalse(running)


class TestModuleAPI(TestModuleOrInstance, unittest.TestCase):
    MODULE_OR_INSTANCE = asynchronous
    NAME_OF_MAP = 'map_'


def verify_error():
    sys.stderr.seek(0, io.SEEK_SET)
    for line in sys.stderr:
        if line == 'queue.Full\n':
            break
    else:
        sys.stderr.seek(0, io.SEEK_SET)
        sys.__stderr__.write(sys.stderr.read())
        sys.__stderr__.flush()


def cause_error(obj):
    sys.stderr = io.StringIO()
    atexit.register(verify_error)
    inspect.currentframe().f_back.f_back.f_locals['queue'].put_nowait(obj)


def return_(obj):
    return obj


# noinspection PyUnusedLocal
def throw(exception, *args):
    raise exception


class TestFutureAPI(TestTimer, TestTimeout, unittest.TestCase):
    CLASS = asynchronous._Future

    def test_valid_representation(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertRegex(repr(future), r'^<_Future at \d+ state=PENDING>$')
        future._set_running_or_notify_cancel()
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        future._set_running_or_notify_cancel()
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
        )
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED raised TimeoutError>$'
        )
        self.assertIs(future.exception(), None)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED returned NoneType>$'
        )

    def test_error_representation(self):
        future = self.instantiate(0.5, cause_error, (None,), {})
        future._set_running_or_notify_cancel()
        self.assertRaises(TypeError, future.result)
        self.assertIsInstance(future.exception(), TimeoutError)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=ERROR raised TimeoutError>$'
        )
        future = self.instantiate(0.5, cause_error, ((False, 'okay'),), {})
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 'okay')
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=ERROR returned str>$'
        )

    def test_cancel(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertRaises(AttributeError, future.cancel)
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertTrue(future.cancelled())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        checker = set()
        future.add_done_callback(checker.add)
        future._set_running_or_notify_cancel()
        future.cancel()
        future.cancel()
        self.assertIs(checker.pop(), future)
        self.assertFalse(checker)

    def test_cancelled(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.cancelled())
        future._set_running_or_notify_cancel()
        self.assertFalse(future.cancelled())
        self.assertIs(future.result(), None)
        self.assertFalse(future.cancelled())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertTrue(future.cancelled())
        future = self.instantiate(0.1, time.sleep, (1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertTrue(future.cancelled())

    def test_running(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.running())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.running())
        self.assertIs(future.result(), None)
        self.assertFalse(future.running())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertFalse(future.running())
        future = self.instantiate(0.1, time.sleep, (1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertFalse(future.running())

    def test_done(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.done())
        future._set_running_or_notify_cancel()
        self.assertFalse(future.done())
        self.assertIs(future.result(), None)
        self.assertTrue(future.done())
        future = self.instantiate(None, time.sleep, (None,), {})
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), TypeError)
        self.assertTrue(future.done())

    def test_result_immediate(self):
        data = os.urandom(1 << 20)
        future = self.instantiate(None, return_, (data,), {})
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), data)
        test_exception = Exception('test')
        future = self.instantiate(None, throw, (test_exception,), {})
        future._set_running_or_notify_cancel()
        with self.assertRaises(Exception) as cm:
            future.result()
        self.assertIsInstance(cm.exception, type(test_exception))
        self.assertEqual(cm.exception.args, test_exception.args)

    def test_result_delay(self):
        future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.1)
        future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 5)
        self.stop_timer(1)
        future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.1)
        future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertRaises(TimeoutError, future.result)
        self.stop_timer(0.5)

    def test_result_before_running(self):
        future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
        delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
        self.start_timer()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.5)

    def test_exception_immediate(self):
        data = os.urandom(1 << 20)
        future = self.instantiate(None, return_, (data,), {})
        future._set_running_or_notify_cancel()
        self.assertIs(future.exception(), None)
        test_exception = Exception('test')
        future = self.instantiate(None, throw, (test_exception,), {})
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), type(test_exception))
        self.assertEqual(future.exception().args, test_exception.args)

    def test_exception_delay(self):
        future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIs(future.exception(), None)
        self.stop_timer(0.1)
        future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIs(future.exception(), None)
        self.stop_timer(1)
        future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIs(future.exception(), None)
        self.stop_timer(0.1)
        future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), TimeoutError)
        self.assertFalse(future.exception().args)
        self.stop_timer(0.5)

    def test_exception_before_running(self):
        future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
        delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
        self.start_timer()
        self.assertIs(future.exception(), None)
        self.stop_timer(0.5)

    def test_valid_add_done_callback(self):
        future = self.instantiate(None, time.sleep, (0,), {})
        requires_callback = {future}
        future.add_done_callback(requires_callback.remove)
        self.assertIn(future, requires_callback)
        future._set_running_or_notify_cancel()
        self.assertIs(future.exception(), None)
        self.assertFalse(requires_callback)
        requires_callback.add(future)
        future.add_done_callback(requires_callback.remove)
        self.assertFalse(requires_callback)

    def test_error_add_done_callback(self):
        future = self.instantiate(None, time.sleep, (0,), {})
        requires_callback = [{future} for _ in range(10)]
        callbacks = [s.remove for s in requires_callback]
        error = Exception()
        callbacks.insert(5, functools.partial(throw, error))
        for fn in callbacks:
            future.add_done_callback(fn)
        future._set_running_or_notify_cancel()
        with self.assertRaises(Exception) as cm:
            future.exception()
        self.assertIs(cm.exception, error)
        self.assertFalse(any(requires_callback))

    def test_set_running_or_notify_cancel(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.running() or future.done())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.running())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.cancelled())


if __name__ == '__main__':
    unittest.main()
1 голос
/ 22 августа 2011

Простое решение:

Посмотрите на модуль datetime. Вы можете сохранить временную метку при запуске цикла и вычислить временную дельту в части while.

Если timedelta больше, чем та, которую вы хотите получить, оторвитесь от цикла, вызвав "break".

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...