Я создаю рабочий класс, который подключается к внешнему потоку событий с помощью asyncio. Это один поток, но несколько потребителей могут включить его. Цель состоит в том, чтобы поддерживать соединение только тогда, когда это требуется одному или нескольким потребителям.
Мои требования следующие:
- Рабочий экземпляр создается динамически при первом обращении к нему потребителя.
- Когда другие потребители тогда требуют этого, они повторно используют тот же рабочий экземпляр.
- Когда последний потребитель закрывает поток, он очищает свои ресурсы.
Это звучит достаточно просто. Тем не менее, последовательность запуска вызывает у меня проблемы, потому что она сама по себе асинхронная. Таким образом, предполагая этот интерфейс:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
У меня есть следующие сценарии:
Сценарий 1 - исключение при запуске
- Потребитель 1 просит работника начать.
- Начинается последовательность запуска рабочего
- Потребитель 2 просит работника начать.
- Последовательность запуска работника вызывает исключение.
- Оба потребителя должны видеть исключение как результат их вызова start ().
Сценарий 2 - частичная асинхронная отмена
- Потребитель 1 просит работника начать.
- Начинается последовательность запуска рабочего
- Потребитель 2 просит работника начать.
- Потребитель 1 отменяется.
- Порядок запуска работника завершен.
- Потребитель 2 должен увидеть успешное начало.
Сценарий 3 - полная асинхронная отмена
- Потребитель 1 просит работника начать.
- Начинается последовательность запуска рабочего
- Потребитель 2 просит работника начать.
- Потребитель 1 отменяется.
- Потребитель 2 отменяется.
- В результате последовательность запуска рабочего должна быть отменена.
Я изо всех сил стараюсь охватить все сценарии, не получая никаких условий гонки и беспорядка спагетти с обнаженными объектами Future или Event.
Вот попытка написания start()
. Он полагается на _worker()
установку asyncio.Event
с именем self._worker_ready
при завершении последовательности запуска:
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
То, что , кажется, работает, но это кажется слишком сложным, и его действительно сложно проверить: у работника много await
точек, что приводит к комбинаторному взрыву, если я попытаюсь попробовать все возможные точки отмены и выполнение поручений.
Мне нужен лучший способ. Мне вот интересно:
- Являются ли мои требования разумными?
- Есть ли общий шаблон для этого?
- Мой вопрос вызывает какой-то запах кода?