Как добавить время ожидания для функции в Python - PullRequest
23 голосов
/ 04 февраля 2010

В прошлом было предпринято много попыток добавить в Python функциональность тайм-аута, чтобы по истечении заданного временного ограничения код ожидания мог двигаться дальше.К сожалению, предыдущие рецепты либо позволяли запущенной функции продолжать работать и потреблять ресурсы, либо убивали функцию, используя специфичный для платформы метод завершения потока.Цель этой вики - разработать кросс-платформенный ответ на эту проблему, который многие программисты должны были решать для различных программных проектов.

#! /usr/bin/env python
"""Provide way to add timeout specifications to arbitrary functions.

There are many ways to add a timeout to a function, but no solution
is both cross-platform and capable of terminating the procedure. This
module use the multiprocessing module to solve both of those problems."""

################################################################################

__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '11 February 2010'
__version__ = '$Revision: 3 $'

################################################################################

import inspect
import sys
import time
import multiprocessing

################################################################################

def add_timeout(function, limit=60):
    """Add a timeout parameter to a function and return it.

    It is illegal to pass anything other than a function as the first
    parameter. If the limit is not given, it gets a default value equal
    to one minute. The function is wrapped and returned to the caller."""
    assert inspect.isfunction(function)
    if limit <= 0:
        raise ValueError()
    return _Timeout(function, limit)

class NotReadyError(Exception): pass

################################################################################

def _target(queue, function, *args, **kwargs):
    """Run a function with arguments and return output via a queue.

    This is a helper function for the Process created in _Timeout. It runs
    the function with positional arguments and keyword arguments and then
    returns the function's output by way of a queue. If an exception gets
    raised, it is returned to _Timeout to be raised by the value property."""
    try:
        queue.put((True, function(*args, **kwargs)))
    except:
        queue.put((False, sys.exc_info()[1]))

class _Timeout:

    """Wrap a function and add a timeout (limit) attribute to it.

    Instances of this class are automatically generated by the add_timeout
    function defined above. Wrapping a function allows asynchronous calls
    to be made and termination of execution after a timeout has passed."""

    def __init__(self, function, limit):
        """Initialize instance in preparation for being called."""
        self.__limit = limit
        self.__function = function
        self.__timeout = time.clock()
        self.__process = multiprocessing.Process()
        self.__queue = multiprocessing.Queue()

    def __call__(self, *args, **kwargs):
        """Execute the embedded function object asynchronously.

        The function given to the constructor is transparently called and
        requires that "ready" be intermittently polled. If and when it is
        True, the "value" property may then be checked for returned data."""
        self.cancel()
        self.__queue = multiprocessing.Queue(1)
        args = (self.__queue, self.__function) + args
        self.__process = multiprocessing.Process(target=_target,
                                                 args=args,
                                                 kwargs=kwargs)
        self.__process.daemon = True
        self.__process.start()
        self.__timeout = self.__limit + time.clock()

    def cancel(self):
        """Terminate any possible execution of the embedded function."""
        if self.__process.is_alive():
            self.__process.terminate()

    @property
    def ready(self):
        """Read-only property indicating status of "value" property."""
        if self.__queue.full():
            return True
        elif not self.__queue.empty():
            return True
        elif self.__timeout < time.clock():
            self.cancel()
        else:
            return False

    @property
    def value(self):
        """Read-only property containing data returned from function."""
        if self.ready is True:
            flag, load = self.__queue.get()
            if flag:
                return load
            raise load
        raise NotReadyError()

    def __get_limit(self):
        return self.__limit

    def __set_limit(self, value):
        if value <= 0:
            raise ValueError()
        self.__limit = value

    limit = property(__get_limit, __set_limit,
                     doc="Property for controlling the value of the timeout.")

Редактировать: Этот кодбыл написан для Python 3.x и не был предназначен для методов класса в качестве украшения.Модуль multiprocessing не предназначен для изменения экземпляров классов через границы процесса.

Ответы [ 5 ]

12 голосов
/ 04 февраля 2010

Основная проблема в вашем коде - чрезмерное использование предотвращения конфликтов пространства имен с двойным подчеркиванием в классе, который вообще не предназначен для использования в подклассах.

Как правило, self.__foo - это запах кода, который должен сопровождаться комментарием в виде # This is a mixin and we don't want arbitrary subclasses to have a namespace conflict.

Далее клиентский API этого метода будет выглядеть так:

def mymethod(): pass

mymethod = add_timeout(mymethod, 15)

# start the processing    
timeout_obj = mymethod()
try:
    # access the property, which is really a function call
    ret = timeout_obj.value
except TimeoutError:
    # handle a timeout here
    ret = None

Это совсем не питон, и лучший клиентский API будет:

@timeout(15)
def mymethod(): pass

try:
    my_method()
except TimeoutError:
    pass

Вы используете @property в своем классе для чего-то, что является средством доступа, изменяющим состояние, это не очень хорошая идея. Например, что произойдет, если доступ к .value будет выполнен дважды? Похоже, что он потерпит неудачу, потому что queue.get () вернет корзину, потому что очередь уже пуста.

Полностью удалить @property. Не используйте его в этом контексте, он не подходит для вашего варианта использования. Сделайте call block при вызове и верните значение или вызовите само исключение. Если вам действительно нужно получить доступ к значению позже, сделайте его методом .get () или .value ().

Этот код для _target следует немного переписать:

def _target(queue, function, *args, **kwargs):
    try:
        queue.put((True, function(*args, **kwargs)))
    except:
        queue.put((False, exc_info())) # get *all* the exec info, don't do exc_info[1]

# then later:
    raise exc_info[0], exc_info[1], exc_info[2]

Таким образом, трассировка стека будет сохранена правильно и видима для программиста.

Я думаю, что вы сделали разумный первый шаг при написании полезной библиотеки, мне нравится использование модуля обработки для достижения целей.

5 голосов
/ 04 февраля 2010

Как получить упомянутый синтаксис декоратора Jerub

def timeout(limit=None):
    if limit is None:
        limit = DEFAULT_TIMEOUT
    if limit <= 0:
        raise TimeoutError() # why not ValueError here?
    def wrap(function):
        return _Timeout(function,limit)
    return wrap

@timeout(15)
def mymethod(): pass
1 голос
/ 19 октября 2016

Библиотека Pebble была разработана для обеспечения кроссплатформенной реализации, способной работать с проблемной логикой, которая может аварийно завершать работу, работать с ошибками или работать бесконечно .

from pebble import concurrent

@concurrent.process(timeout=10)
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

try:
    result = future.result()  # blocks until results are ready
except Exception as error:
    print("Function raised %s" % error)
    print(error.traceback)  # traceback of the function
except TimeoutError as error:
    print("Function took longer than %d seconds" % error.args[1])

Декоратор работает также со статическими и классовыми методами. Тем не менее, я бы не советовал украшать методы, так как это довольно ошибочная практика.

0 голосов
/ 17 мая 2019

Поскольку тестирование крайне важно для обеспечения правильной работы кода, был создан модульный тест для связанного ответа .Он включен ниже и может быть запущен вдоль стороны модуля asynchronous, который он тестирует.

test_asynchronous.py

#! /usr/bin/env python3
import _thread
import atexit
import contextlib
import functools
import inspect
import itertools
import io
import math
import operator
import os
import queue
import sys
import time
import unittest

import asynchronous


# noinspection PyUnresolvedReferences
class TestConstructor:
    def instantiate(self, *args):
        parameters = len(inspect.signature(self.CLASS).parameters)
        return self.CLASS(*args[:parameters])

    def test_valid_timeout(self):
        instance = self.instantiate(None, print, (), {})
        self.assertEqual(instance.get_timeout(), math.inf)
        instance = self.instantiate(1, print, (), {})
        self.assertEqual(instance.get_timeout(), 1)
        float_timeout = (math.e ** (1j * math.pi) + 1).imag
        self.assertIsInstance(float_timeout, float)
        instance = self.instantiate(float_timeout, print, (), {})
        self.assertEqual(instance.get_timeout(), float_timeout)

    def test_error_timeout(self):
        self.assertRaises(TypeError, self.instantiate, '60', print, (), {})
        self.assertRaises(ValueError, self.instantiate, 0, print, (), {})
        self.assertRaises(ValueError, self.instantiate, -1, print, (), {})


# noinspection PyUnresolvedReferences
class TestTimeout(TestConstructor):
    def test_valid_property(self):
        instance = self.instantiate(None, None, None, None)
        instance.timeout = 1
        self.assertIsInstance(instance.timeout, int)
        instance.timeout = 1 / 2
        self.assertIsInstance(instance.timeout, float)
        kilo_bit = int.from_bytes(os.urandom(1 << 7), 'big')
        instance.timeout = kilo_bit
        self.assertEqual(instance.timeout, kilo_bit)

    def test_error_property(self):
        instance = self.instantiate(None, None, None, None)
        for exception, value in (
                (TypeError, 'inf'),
                (TypeError, complex(123456789, 0)),
                (ValueError, 0),
                (ValueError, 0.0),
                (ValueError, -1),
                (ValueError, -math.pi)
        ):
            with self.assertRaises(exception):
                instance.timeout = value
            self.assertEqual(instance.timeout, math.inf)


class Timer:
    __timers = {}

    @classmethod
    def start_timer(cls):
        ident, now = _thread.get_ident(), time.perf_counter()
        if now is not cls.__timers.setdefault(ident, now):
            raise KeyError(ident)

    @classmethod
    def stop_timer(cls, expected_time, error=None):
        if error is None:
            error = 1 / 4  # the default is a quarter second
        used = time.perf_counter() - cls.__timers.pop(_thread.get_ident())
        diff = used - expected_time
        return -error <= diff <= +error


# noinspection PyUnresolvedReferences
class TestTimer(Timer):
    def stop_timer(self, expected_time, error=None):
        self.assertTrue(super().stop_timer(expected_time, error))


def delay_run(delay, fn, *args, sync=True, **kwargs):
    def wrapper():
        time.sleep(delay)
        return fn(*args, **kwargs)

    if sync:
        return wrapper()
    _thread.start_new_thread(wrapper, ())


# noinspection PyUnresolvedReferences
class TestModuleOrInstance(TestTimer):
    @property
    def moi(self):
        return self.MODULE_OR_INSTANCE

    def test_valid_timeout(self):
        self.moi.set_timeout(math.inf)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.moi.set_timeout(60)
        self.assertEqual(self.moi.get_timeout(), 60)
        self.moi.set_timeout(0.05)
        self.assertEqual(self.moi.get_timeout(), 0.05)

    def test_error_timeout(self):
        self.moi.set_timeout(math.inf)
        self.assertRaises(TypeError, self.moi.set_timeout, None)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.assertRaises(ValueError, self.moi.set_timeout, 0)
        self.assertEqual(self.moi.get_timeout(), math.inf)
        self.assertRaises(ValueError, self.moi.set_timeout, -1)
        self.assertEqual(self.moi.get_timeout(), math.inf)

    def run_submit_check(self):
        self.start_timer()
        future = self.moi.submit(delay_run, 0.5, operator.add, 1, 2)
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED returned int>$'
        )

    def test_submit_one_second_timeout(self):
        self.moi.set_timeout(1)
        self.run_submit_check()

    def test_submit_no_timeout(self):
        self.moi.set_timeout(math.inf)
        self.run_submit_check()

    def test_submit_short_timeout(self):
        self.moi.set_timeout(0.5)
        self.start_timer()
        future = self.moi.submit(delay_run, 1, operator.add, 1, 2)
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        self.assertIsInstance(future.exception(), TimeoutError)
        self.stop_timer(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
        )

    def run_map(self, *args):
        return getattr(self.moi, self.NAME_OF_MAP)(delay_run, *args)

    def test_valid_map(self):
        self.moi.set_timeout(1.5)
        for result in self.run_map(
                [1, 1, 1, 1],
                [operator.add] * 4,
                [0, 1, 2, 3],
                [3, 2, 1, 0]
        ):
            self.assertEqual(result, 3)

    def test_error_map(self):
        self.moi.set_timeout(1.5)
        success = 0
        with self.assertRaises(TimeoutError):
            for result in self.run_map(
                    [1, 1, 2, 1],
                    [operator.add] * 4,
                    [0, 1, 2, 3],
                    [3, 2, 1, 0]
            ):
                self.assertEqual(result, 3)
                success += 1
        self.assertEqual(success, 2)

    def run_shutdown_check(self, running, future):
        self.assertRaises(TimeoutError, future.result)
        running.remove(future)

    def run_submit_loop(self, executor):
        running = set()
        done_callback = functools.partial(self.run_shutdown_check, running)
        for _ in range(10):
            future = executor.submit(delay_run, 2, operator.add, 10, 20)
            running.add(future)
            future.add_done_callback(done_callback)
        time.sleep(0.5)
        return running

    def test_valid_shutdown(self):
        self.moi.set_timeout(1.5)
        running = self.run_submit_loop(self.moi)
        self.moi.shutdown()
        self.assertFalse(running)

    def test_error_shutdown(self):
        self.moi.set_timeout(1.5)
        running = self.run_submit_loop(self.moi)
        running.pop()
        self.assertRaises(KeyError, self.moi.shutdown)
        self.assertFalse(running)


class TestExecutorAPI(TestTimeout, TestModuleOrInstance, unittest.TestCase):
    CLASS = asynchronous.Executor
    MODULE_OR_INSTANCE = CLASS()
    NAME_OF_MAP = 'map'

    def test_valid_context_manager(self):
        with self.instantiate(1.5) as executor:
            running = self.run_submit_loop(executor)
        self.assertFalse(running)

    def test_error_context_manager(self):
        error = Exception()
        with self.assertRaises(Exception) as cm:
            with self.instantiate(1.5) as executor:
                running = self.run_submit_loop(executor)
                raise error
        self.assertIs(cm.exception, error)
        self.assertFalse(running)
        with self.assertRaises(KeyError):
            with self.instantiate(1.5) as executor:
                running = self.run_submit_loop(executor)
                running.pop()
        self.assertFalse(running)


class TestModuleAPI(TestModuleOrInstance, unittest.TestCase):
    MODULE_OR_INSTANCE = asynchronous
    NAME_OF_MAP = 'map_'


def verify_error():
    sys.stderr.seek(0, io.SEEK_SET)
    for line in sys.stderr:
        if line == 'queue.Full\n':
            break
    else:
        sys.stderr.seek(0, io.SEEK_SET)
        sys.__stderr__.write(sys.stderr.read())
        sys.__stderr__.flush()


def cause_error(obj):
    sys.stderr = io.StringIO()
    atexit.register(verify_error)
    inspect.currentframe().f_back.f_back.f_locals['queue'].put_nowait(obj)


def return_(obj):
    return obj


# noinspection PyUnusedLocal
def throw(exception, *args):
    raise exception


class Silencer:
    def __init__(self, silenced):
        self.__silenced = silenced
        self.__ident = _thread.get_ident()

    @property
    def silenced(self):
        return self.__silenced

    def __getattr__(self, name):
        return (getattr(self.__silenced, name)
                if _thread.get_ident() == self.__ident else
                self)

    def __call__(self, *args, **kwargs):
        return self


@contextlib.contextmanager
def silence_other_threads():
    sys.stdout, sys.stderr = Silencer(sys.stdout), Silencer(sys.stderr)
    try:
        yield
    finally:
        sys.stdout, sys.stderr = sys.stdout.silenced, sys.stderr.silenced


class TestFutureAPI(TestTimer, TestTimeout, unittest.TestCase):
    CLASS = asynchronous._Future

    def test_valid_representation(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertRegex(repr(future), r'^<_Future at \d+ state=PENDING>$')
        future._set_running_or_notify_cancel()
        self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
        future._set_running_or_notify_cancel()
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
        )
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED raised TimeoutError>$'
        )
        self.assertIsNone(future.exception())
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=FINISHED returned NoneType>$'
        )

    def test_error_representation(self):
        future = self.instantiate(0.5, cause_error, (None,), {})
        future._set_running_or_notify_cancel()
        self.assertRaises(TypeError, future.result)
        self.assertIsInstance(future.exception(), TimeoutError)
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=ERROR raised TimeoutError>$'
        )
        future = self.instantiate(0.5, cause_error, ((False, 'okay'),), {})
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 'okay')
        self.assertRegex(
            repr(future),
            r'^<_Future at \d+ state=ERROR returned str>$'
        )

    def test_cancel(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertRaises(AttributeError, future.cancel)
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertTrue(future.cancelled())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        checker = set()
        future.add_done_callback(checker.add)
        future._set_running_or_notify_cancel()
        future.cancel()
        future.cancel()
        self.assertIs(checker.pop(), future)
        self.assertFalse(checker)

    def test_cancelled(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.cancelled())
        future._set_running_or_notify_cancel()
        self.assertFalse(future.cancelled())
        self.assertIsNone(future.result())
        self.assertFalse(future.cancelled())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertTrue(future.cancelled())
        future = self.instantiate(0.1, time.sleep, (1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertTrue(future.cancelled())

    def test_running(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.running())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.running())
        self.assertIsNone(future.result())
        self.assertFalse(future.running())
        future = self.instantiate(None, time.sleep, (0.1,), {})
        future._set_running_or_notify_cancel()
        future.cancel()
        self.assertFalse(future.running())
        future = self.instantiate(0.1, time.sleep, (1,), {})
        future._set_running_or_notify_cancel()
        time.sleep(0.5)
        self.assertFalse(future.running())

    def test_done(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.done())
        future._set_running_or_notify_cancel()
        self.assertFalse(future.done())
        self.assertIsNone(future.result())
        self.assertTrue(future.done())
        future = self.instantiate(None, time.sleep, (None,), {})
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), TypeError)
        self.assertTrue(future.done())

    def test_result_immediate(self):
        data = os.urandom(1 << 20)
        future = self.instantiate(None, return_, (data,), {})
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), data)
        test_exception = Exception('test')
        future = self.instantiate(None, throw, (test_exception,), {})
        future._set_running_or_notify_cancel()
        with self.assertRaises(Exception) as cm:
            future.result()
        self.assertIsInstance(cm.exception, type(test_exception))
        self.assertEqual(cm.exception.args, test_exception.args)

    def test_result_delay(self):
        future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.1)
        future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 5)
        self.stop_timer(1)
        future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.1)
        future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertRaises(TimeoutError, future.result)
        self.stop_timer(0.5)

    def test_result_before_running(self):
        future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
        delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
        self.start_timer()
        self.assertEqual(future.result(), 3)
        self.stop_timer(0.5)

    def run_time_check(self, test):
        self.start_timer()
        test()
        self.stop_timer(0.5)

    def run_waiter_check(self, threads, *tests):
        future = self.instantiate(1, delay_run, (0.5, operator.add, 1, 2), {})
        future._set_running_or_notify_cancel()
        # noinspection PyUnresolvedReferences
        result = queue.SimpleQueue()
        with silence_other_threads():
            for test in itertools.islice(itertools.cycle(tests), threads):
                args = self.run_time_check, (lambda: test(future),), {}, result
                _thread.start_new_thread(asynchronous._run, args)
            for _ in range(threads):
                error, value = result.get(True, 1.5)
                self.assertFalse(error)

    def test_result_with_waiters(self):
        self.run_waiter_check(
            10,
            lambda future: self.assertEqual(future.result(), 3)
        )

    def test_exception_immediate(self):
        data = os.urandom(1 << 20)
        future = self.instantiate(None, return_, (data,), {})
        future._set_running_or_notify_cancel()
        self.assertIsNone(future.exception())
        test_exception = Exception('test')
        future = self.instantiate(None, throw, (test_exception,), {})
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), type(test_exception))
        self.assertEqual(future.exception().args, test_exception.args)

    def test_exception_delay(self):
        future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIsNone(future.exception())
        self.stop_timer(0.1)
        future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIsNone(future.exception())
        self.stop_timer(1)
        future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIsNone(future.exception())
        self.stop_timer(0.1)
        future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
        self.start_timer()
        future._set_running_or_notify_cancel()
        self.assertIsInstance(future.exception(), TimeoutError)
        self.assertFalse(future.exception().args)
        self.stop_timer(0.5)

    def test_exception_before_running(self):
        future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
        delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
        self.start_timer()
        self.assertIsNone(future.exception())
        self.stop_timer(0.5)

    def test_exception_with_waiters(self):
        self.run_waiter_check(
            10,
            lambda future: self.assertIsNone(future.exception())
        )

    def test_result_and_exception_waiters(self):
        self.run_waiter_check(
            10,
            lambda future: self.assertEqual(future.result(), 3),
            lambda future: self.assertIsNone(future.exception())
        )
        self.run_waiter_check(
            10,
            lambda future: self.assertIsNone(future.exception()),
            lambda future: self.assertEqual(future.result(), 3)
        )

    def test_valid_add_done_callback(self):
        future = self.instantiate(None, time.sleep, (0,), {})
        requires_callback = {future}
        future.add_done_callback(requires_callback.remove)
        self.assertIn(future, requires_callback)
        future._set_running_or_notify_cancel()
        self.assertIsNone(future.exception())
        self.assertFalse(requires_callback)
        requires_callback.add(future)
        future.add_done_callback(requires_callback.remove)
        self.assertFalse(requires_callback)

    def test_error_add_done_callback(self):
        future = self.instantiate(None, time.sleep, (0,), {})
        requires_callback = [{future} for _ in range(10)]
        callbacks = [s.remove for s in requires_callback]
        error = Exception()
        callbacks.insert(5, functools.partial(throw, error))
        for fn in callbacks:
            future.add_done_callback(fn)
        future._set_running_or_notify_cancel()
        with self.assertRaises(Exception) as cm:
            future.exception()
        self.assertIs(cm.exception, error)
        self.assertFalse(any(requires_callback))

    def test_set_running_or_notify_cancel(self):
        future = self.instantiate(None, time.sleep, (0.1,), {})
        self.assertFalse(future.running() or future.done())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.running())
        future._set_running_or_notify_cancel()
        self.assertTrue(future.cancelled())

    def test_not_empty_queue(self):
        data = os.urandom(1 << 20)
        future = self.instantiate(None, return_, (data,), {})
        future._set_running_or_notify_cancel()
        # noinspection PyUnresolvedReferences
        result = queue.SimpleQueue()
        with silence_other_threads():
            for _ in range(2):
                delay_run(
                    0.1,
                    asynchronous._run,
                    lambda: self.assertEqual(future.result(), data),
                    (),
                    {},
                    result,
                    sync=False
                )
            for _ in range(2):
                error, value = result.get(True, 0.2)
                self.assertFalse(error)


if __name__ == '__main__':
    unittest.main()
0 голосов
/ 02 мая 2019

Этот вопрос был задан более 9 лет назад, и с тех пор Python изменил приличную сумму, как и мой опыт работы. После просмотра других API в стандартной библиотеке и необходимости частичной репликации, в частности, следующий модуль был написан для целей, аналогичных описанным в вопросе.

asynchronous.py

#! /usr/bin/env python3
import _thread
import abc as _abc
import collections as _collections
import enum as _enum
import math as _math
import multiprocessing as _multiprocessing
import operator as _operator
import queue as _queue
import signal as _signal
import sys as _sys
import time as _time

__all__ = (
    'Executor',
    'get_timeout',
    'set_timeout',
    'submit',
    'map_',
    'shutdown'
)


class _Base(metaclass=_abc.ABCMeta):
    __slots__ = (
        '__timeout',
    )

    @_abc.abstractmethod
    def __init__(self, timeout):
        self.timeout = _math.inf if timeout is None else timeout

    def get_timeout(self):
        return self.__timeout

    def set_timeout(self, value):
        if not isinstance(value, (float, int)):
            raise TypeError('value must be of type float or int')
        if value <= 0:
            raise ValueError('value must be greater than zero')
        self.__timeout = value

    timeout = property(get_timeout, set_timeout)


def _run_and_catch(fn, args, kwargs):
    # noinspection PyPep8,PyBroadException
    try:
        return False, fn(*args, **kwargs)
    except:
        return True, _sys.exc_info()[1]


def _run(fn, args, kwargs, queue):
    queue.put_nowait(_run_and_catch(fn, args, kwargs))


class _State(_enum.IntEnum):
    PENDING = _enum.auto()
    RUNNING = _enum.auto()
    CANCELLED = _enum.auto()
    FINISHED = _enum.auto()
    ERROR = _enum.auto()


def _run_and_catch_loop(iterable, *args, **kwargs):
    exception = None
    for fn in iterable:
        error, value = _run_and_catch(fn, args, kwargs)
        if error:
            exception = value
    if exception:
        raise exception


class _Future(_Base):
    __slots__ = (
        '__queue',
        '__process',
        '__start_time',
        '__callbacks',
        '__result',
        '__mutex'
    )

    def __init__(self, timeout, fn, args, kwargs):
        super().__init__(timeout)
        self.__queue = _multiprocessing.Queue(1)
        self.__process = _multiprocessing.Process(
            target=_run,
            args=(fn, args, kwargs, self.__queue),
            daemon=True
        )
        self.__start_time = _math.inf
        self.__callbacks = _collections.deque()
        self.__result = True, TimeoutError()
        self.__mutex = _thread.allocate_lock()

    @property
    def __state(self):
        pid, exitcode = self.__process.pid, self.__process.exitcode
        return (_State.PENDING if pid is None else
                _State.RUNNING if exitcode is None else
                _State.CANCELLED if exitcode == -_signal.SIGTERM else
                _State.FINISHED if exitcode == 0 else
                _State.ERROR)

    def __repr__(self):
        root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
        if self.__state < _State.CANCELLED:
            return f'<{root}>'
        error, value = self.__result
        suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
        return f'<{root} {suffix}>'

    def __consume_callbacks(self):
        while self.__callbacks:
            yield self.__callbacks.popleft()

    def __invoke_callbacks(self):
        self.__process.join()
        _run_and_catch_loop(self.__consume_callbacks(), self)

    def cancel(self):
        self.__process.terminate()
        self.__invoke_callbacks()

    def __auto_cancel(self):
        elapsed_time = _time.perf_counter() - self.__start_time
        if elapsed_time > self.timeout:
            self.cancel()
        return elapsed_time

    def cancelled(self):
        self.__auto_cancel()
        return self.__state is _State.CANCELLED

    def running(self):
        self.__auto_cancel()
        return self.__state is _State.RUNNING

    def done(self):
        self.__auto_cancel()
        return self.__state > _State.RUNNING

    def __handle_result(self, error, value):
        self.__result = error, value
        self.__invoke_callbacks()

    def __ensure_termination(self):
        with self.__mutex:
            elapsed_time = self.__auto_cancel()
            if not self.__queue.empty():
                self.__handle_result(*self.__queue.get_nowait())
            elif self.__state < _State.CANCELLED:
                remaining_time = self.timeout - elapsed_time
                if remaining_time == _math.inf:
                    remaining_time = None
                try:
                    result = self.__queue.get(True, remaining_time)
                except _queue.Empty:
                    self.cancel()
                else:
                    self.__handle_result(*result)

    def result(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            raise value
        return value

    def exception(self):
        self.__ensure_termination()
        error, value = self.__result
        if error:
            return value

    def add_done_callback(self, fn):
        if self.done():
            fn(self)
        else:
            self.__callbacks.append(fn)

    def _set_running_or_notify_cancel(self):
        if self.__state is _State.PENDING:
            self.__process.start()
            self.__start_time = _time.perf_counter()
        else:
            self.cancel()


class Executor(_Base):
    __slots__ = (
        '__futures',
    )

    def __init__(self, timeout=None):
        super().__init__(timeout)
        self.__futures = set()

    def submit(self, fn, *args, **kwargs):
        future = _Future(self.timeout, fn, args, kwargs)
        self.__futures.add(future)
        future.add_done_callback(self.__futures.remove)
        # noinspection PyProtectedMember
        future._set_running_or_notify_cancel()
        return future

    @staticmethod
    def __cancel_futures(iterable):
        _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))

    def map(self, fn, *iterables):
        futures = tuple(self.submit(fn, *args) for args in zip(*iterables))

        def result_iterator():
            future_iterator = iter(futures)
            try:
                for future in future_iterator:
                    yield future.result()
            finally:
                self.__cancel_futures(future_iterator)

        return result_iterator()

    def shutdown(self):
        self.__cancel_futures(frozenset(self.__futures))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown()
        return False


_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor
...