Поскольку тестирование крайне важно для обеспечения правильной работы кода, был создан модульный тест для связанного ответа .Он включен ниже и может быть запущен вдоль стороны модуля asynchronous
, который он тестирует.
test_asynchronous.py
#! /usr/bin/env python3
import _thread
import atexit
import contextlib
import functools
import inspect
import itertools
import io
import math
import operator
import os
import queue
import sys
import time
import unittest
import asynchronous
# noinspection PyUnresolvedReferences
class TestConstructor:
def instantiate(self, *args):
parameters = len(inspect.signature(self.CLASS).parameters)
return self.CLASS(*args[:parameters])
def test_valid_timeout(self):
instance = self.instantiate(None, print, (), {})
self.assertEqual(instance.get_timeout(), math.inf)
instance = self.instantiate(1, print, (), {})
self.assertEqual(instance.get_timeout(), 1)
float_timeout = (math.e ** (1j * math.pi) + 1).imag
self.assertIsInstance(float_timeout, float)
instance = self.instantiate(float_timeout, print, (), {})
self.assertEqual(instance.get_timeout(), float_timeout)
def test_error_timeout(self):
self.assertRaises(TypeError, self.instantiate, '60', print, (), {})
self.assertRaises(ValueError, self.instantiate, 0, print, (), {})
self.assertRaises(ValueError, self.instantiate, -1, print, (), {})
# noinspection PyUnresolvedReferences
class TestTimeout(TestConstructor):
def test_valid_property(self):
instance = self.instantiate(None, None, None, None)
instance.timeout = 1
self.assertIsInstance(instance.timeout, int)
instance.timeout = 1 / 2
self.assertIsInstance(instance.timeout, float)
kilo_bit = int.from_bytes(os.urandom(1 << 7), 'big')
instance.timeout = kilo_bit
self.assertEqual(instance.timeout, kilo_bit)
def test_error_property(self):
instance = self.instantiate(None, None, None, None)
for exception, value in (
(TypeError, 'inf'),
(TypeError, complex(123456789, 0)),
(ValueError, 0),
(ValueError, 0.0),
(ValueError, -1),
(ValueError, -math.pi)
):
with self.assertRaises(exception):
instance.timeout = value
self.assertEqual(instance.timeout, math.inf)
class Timer:
__timers = {}
@classmethod
def start_timer(cls):
ident, now = _thread.get_ident(), time.perf_counter()
if now is not cls.__timers.setdefault(ident, now):
raise KeyError(ident)
@classmethod
def stop_timer(cls, expected_time, error=None):
if error is None:
error = 1 / 4 # the default is a quarter second
used = time.perf_counter() - cls.__timers.pop(_thread.get_ident())
diff = used - expected_time
return -error <= diff <= +error
# noinspection PyUnresolvedReferences
class TestTimer(Timer):
def stop_timer(self, expected_time, error=None):
self.assertTrue(super().stop_timer(expected_time, error))
def delay_run(delay, fn, *args, sync=True, **kwargs):
def wrapper():
time.sleep(delay)
return fn(*args, **kwargs)
if sync:
return wrapper()
_thread.start_new_thread(wrapper, ())
# noinspection PyUnresolvedReferences
class TestModuleOrInstance(TestTimer):
@property
def moi(self):
return self.MODULE_OR_INSTANCE
def test_valid_timeout(self):
self.moi.set_timeout(math.inf)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.moi.set_timeout(60)
self.assertEqual(self.moi.get_timeout(), 60)
self.moi.set_timeout(0.05)
self.assertEqual(self.moi.get_timeout(), 0.05)
def test_error_timeout(self):
self.moi.set_timeout(math.inf)
self.assertRaises(TypeError, self.moi.set_timeout, None)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.assertRaises(ValueError, self.moi.set_timeout, 0)
self.assertEqual(self.moi.get_timeout(), math.inf)
self.assertRaises(ValueError, self.moi.set_timeout, -1)
self.assertEqual(self.moi.get_timeout(), math.inf)
def run_submit_check(self):
self.start_timer()
future = self.moi.submit(delay_run, 0.5, operator.add, 1, 2)
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
self.assertEqual(future.result(), 3)
self.stop_timer(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED returned int>$'
)
def test_submit_one_second_timeout(self):
self.moi.set_timeout(1)
self.run_submit_check()
def test_submit_no_timeout(self):
self.moi.set_timeout(math.inf)
self.run_submit_check()
def test_submit_short_timeout(self):
self.moi.set_timeout(0.5)
self.start_timer()
future = self.moi.submit(delay_run, 1, operator.add, 1, 2)
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
self.assertIsInstance(future.exception(), TimeoutError)
self.stop_timer(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
)
def run_map(self, *args):
return getattr(self.moi, self.NAME_OF_MAP)(delay_run, *args)
def test_valid_map(self):
self.moi.set_timeout(1.5)
for result in self.run_map(
[1, 1, 1, 1],
[operator.add] * 4,
[0, 1, 2, 3],
[3, 2, 1, 0]
):
self.assertEqual(result, 3)
def test_error_map(self):
self.moi.set_timeout(1.5)
success = 0
with self.assertRaises(TimeoutError):
for result in self.run_map(
[1, 1, 2, 1],
[operator.add] * 4,
[0, 1, 2, 3],
[3, 2, 1, 0]
):
self.assertEqual(result, 3)
success += 1
self.assertEqual(success, 2)
def run_shutdown_check(self, running, future):
self.assertRaises(TimeoutError, future.result)
running.remove(future)
def run_submit_loop(self, executor):
running = set()
done_callback = functools.partial(self.run_shutdown_check, running)
for _ in range(10):
future = executor.submit(delay_run, 2, operator.add, 10, 20)
running.add(future)
future.add_done_callback(done_callback)
time.sleep(0.5)
return running
def test_valid_shutdown(self):
self.moi.set_timeout(1.5)
running = self.run_submit_loop(self.moi)
self.moi.shutdown()
self.assertFalse(running)
def test_error_shutdown(self):
self.moi.set_timeout(1.5)
running = self.run_submit_loop(self.moi)
running.pop()
self.assertRaises(KeyError, self.moi.shutdown)
self.assertFalse(running)
class TestExecutorAPI(TestTimeout, TestModuleOrInstance, unittest.TestCase):
CLASS = asynchronous.Executor
MODULE_OR_INSTANCE = CLASS()
NAME_OF_MAP = 'map'
def test_valid_context_manager(self):
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
self.assertFalse(running)
def test_error_context_manager(self):
error = Exception()
with self.assertRaises(Exception) as cm:
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
raise error
self.assertIs(cm.exception, error)
self.assertFalse(running)
with self.assertRaises(KeyError):
with self.instantiate(1.5) as executor:
running = self.run_submit_loop(executor)
running.pop()
self.assertFalse(running)
class TestModuleAPI(TestModuleOrInstance, unittest.TestCase):
MODULE_OR_INSTANCE = asynchronous
NAME_OF_MAP = 'map_'
def verify_error():
sys.stderr.seek(0, io.SEEK_SET)
for line in sys.stderr:
if line == 'queue.Full\n':
break
else:
sys.stderr.seek(0, io.SEEK_SET)
sys.__stderr__.write(sys.stderr.read())
sys.__stderr__.flush()
def cause_error(obj):
sys.stderr = io.StringIO()
atexit.register(verify_error)
inspect.currentframe().f_back.f_back.f_locals['queue'].put_nowait(obj)
def return_(obj):
return obj
# noinspection PyUnusedLocal
def throw(exception, *args):
raise exception
class Silencer:
def __init__(self, silenced):
self.__silenced = silenced
self.__ident = _thread.get_ident()
@property
def silenced(self):
return self.__silenced
def __getattr__(self, name):
return (getattr(self.__silenced, name)
if _thread.get_ident() == self.__ident else
self)
def __call__(self, *args, **kwargs):
return self
@contextlib.contextmanager
def silence_other_threads():
sys.stdout, sys.stderr = Silencer(sys.stdout), Silencer(sys.stderr)
try:
yield
finally:
sys.stdout, sys.stderr = sys.stdout.silenced, sys.stderr.silenced
class TestFutureAPI(TestTimer, TestTimeout, unittest.TestCase):
CLASS = asynchronous._Future
def test_valid_representation(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertRegex(repr(future), r'^<_Future at \d+ state=PENDING>$')
future._set_running_or_notify_cancel()
self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
future._set_running_or_notify_cancel()
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
)
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED raised TimeoutError>$'
)
self.assertIsNone(future.exception())
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=FINISHED returned NoneType>$'
)
def test_error_representation(self):
future = self.instantiate(0.5, cause_error, (None,), {})
future._set_running_or_notify_cancel()
self.assertRaises(TypeError, future.result)
self.assertIsInstance(future.exception(), TimeoutError)
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=ERROR raised TimeoutError>$'
)
future = self.instantiate(0.5, cause_error, ((False, 'okay'),), {})
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 'okay')
self.assertRegex(
repr(future),
r'^<_Future at \d+ state=ERROR returned str>$'
)
def test_cancel(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertRaises(AttributeError, future.cancel)
future._set_running_or_notify_cancel()
future.cancel()
self.assertTrue(future.cancelled())
future = self.instantiate(None, time.sleep, (0.1,), {})
checker = set()
future.add_done_callback(checker.add)
future._set_running_or_notify_cancel()
future.cancel()
future.cancel()
self.assertIs(checker.pop(), future)
self.assertFalse(checker)
def test_cancelled(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.cancelled())
future._set_running_or_notify_cancel()
self.assertFalse(future.cancelled())
self.assertIsNone(future.result())
self.assertFalse(future.cancelled())
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
future.cancel()
self.assertTrue(future.cancelled())
future = self.instantiate(0.1, time.sleep, (1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertTrue(future.cancelled())
def test_running(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.running())
future._set_running_or_notify_cancel()
self.assertTrue(future.running())
self.assertIsNone(future.result())
self.assertFalse(future.running())
future = self.instantiate(None, time.sleep, (0.1,), {})
future._set_running_or_notify_cancel()
future.cancel()
self.assertFalse(future.running())
future = self.instantiate(0.1, time.sleep, (1,), {})
future._set_running_or_notify_cancel()
time.sleep(0.5)
self.assertFalse(future.running())
def test_done(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.done())
future._set_running_or_notify_cancel()
self.assertFalse(future.done())
self.assertIsNone(future.result())
self.assertTrue(future.done())
future = self.instantiate(None, time.sleep, (None,), {})
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), TypeError)
self.assertTrue(future.done())
def test_result_immediate(self):
data = os.urandom(1 << 20)
future = self.instantiate(None, return_, (data,), {})
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), data)
test_exception = Exception('test')
future = self.instantiate(None, throw, (test_exception,), {})
future._set_running_or_notify_cancel()
with self.assertRaises(Exception) as cm:
future.result()
self.assertIsInstance(cm.exception, type(test_exception))
self.assertEqual(cm.exception.args, test_exception.args)
def test_result_delay(self):
future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 3)
self.stop_timer(0.1)
future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 5)
self.stop_timer(1)
future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertEqual(future.result(), 3)
self.stop_timer(0.1)
future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertRaises(TimeoutError, future.result)
self.stop_timer(0.5)
def test_result_before_running(self):
future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
self.start_timer()
self.assertEqual(future.result(), 3)
self.stop_timer(0.5)
def run_time_check(self, test):
self.start_timer()
test()
self.stop_timer(0.5)
def run_waiter_check(self, threads, *tests):
future = self.instantiate(1, delay_run, (0.5, operator.add, 1, 2), {})
future._set_running_or_notify_cancel()
# noinspection PyUnresolvedReferences
result = queue.SimpleQueue()
with silence_other_threads():
for test in itertools.islice(itertools.cycle(tests), threads):
args = self.run_time_check, (lambda: test(future),), {}, result
_thread.start_new_thread(asynchronous._run, args)
for _ in range(threads):
error, value = result.get(True, 1.5)
self.assertFalse(error)
def test_result_with_waiters(self):
self.run_waiter_check(
10,
lambda future: self.assertEqual(future.result(), 3)
)
def test_exception_immediate(self):
data = os.urandom(1 << 20)
future = self.instantiate(None, return_, (data,), {})
future._set_running_or_notify_cancel()
self.assertIsNone(future.exception())
test_exception = Exception('test')
future = self.instantiate(None, throw, (test_exception,), {})
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), type(test_exception))
self.assertEqual(future.exception().args, test_exception.args)
def test_exception_delay(self):
future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIsNone(future.exception())
self.stop_timer(0.1)
future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIsNone(future.exception())
self.stop_timer(1)
future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIsNone(future.exception())
self.stop_timer(0.1)
future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
self.start_timer()
future._set_running_or_notify_cancel()
self.assertIsInstance(future.exception(), TimeoutError)
self.assertFalse(future.exception().args)
self.stop_timer(0.5)
def test_exception_before_running(self):
future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
self.start_timer()
self.assertIsNone(future.exception())
self.stop_timer(0.5)
def test_exception_with_waiters(self):
self.run_waiter_check(
10,
lambda future: self.assertIsNone(future.exception())
)
def test_result_and_exception_waiters(self):
self.run_waiter_check(
10,
lambda future: self.assertEqual(future.result(), 3),
lambda future: self.assertIsNone(future.exception())
)
self.run_waiter_check(
10,
lambda future: self.assertIsNone(future.exception()),
lambda future: self.assertEqual(future.result(), 3)
)
def test_valid_add_done_callback(self):
future = self.instantiate(None, time.sleep, (0,), {})
requires_callback = {future}
future.add_done_callback(requires_callback.remove)
self.assertIn(future, requires_callback)
future._set_running_or_notify_cancel()
self.assertIsNone(future.exception())
self.assertFalse(requires_callback)
requires_callback.add(future)
future.add_done_callback(requires_callback.remove)
self.assertFalse(requires_callback)
def test_error_add_done_callback(self):
future = self.instantiate(None, time.sleep, (0,), {})
requires_callback = [{future} for _ in range(10)]
callbacks = [s.remove for s in requires_callback]
error = Exception()
callbacks.insert(5, functools.partial(throw, error))
for fn in callbacks:
future.add_done_callback(fn)
future._set_running_or_notify_cancel()
with self.assertRaises(Exception) as cm:
future.exception()
self.assertIs(cm.exception, error)
self.assertFalse(any(requires_callback))
def test_set_running_or_notify_cancel(self):
future = self.instantiate(None, time.sleep, (0.1,), {})
self.assertFalse(future.running() or future.done())
future._set_running_or_notify_cancel()
self.assertTrue(future.running())
future._set_running_or_notify_cancel()
self.assertTrue(future.cancelled())
def test_not_empty_queue(self):
data = os.urandom(1 << 20)
future = self.instantiate(None, return_, (data,), {})
future._set_running_or_notify_cancel()
# noinspection PyUnresolvedReferences
result = queue.SimpleQueue()
with silence_other_threads():
for _ in range(2):
delay_run(
0.1,
asynchronous._run,
lambda: self.assertEqual(future.result(), data),
(),
{},
result,
sync=False
)
for _ in range(2):
error, value = result.get(True, 0.2)
self.assertFalse(error)
if __name__ == '__main__':
unittest.main()