Распределение динамически запущенного работника среди нескольких потребителей - PullRequest
0 голосов
/ 08 ноября 2018

Я создаю рабочий класс, который подключается к внешнему потоку событий с помощью asyncio. Это один поток, но несколько потребителей могут включить его. Цель состоит в том, чтобы поддерживать соединение только тогда, когда это требуется одному или нескольким потребителям.

Мои требования следующие:

  • Рабочий экземпляр создается динамически при первом обращении к нему потребителя.
  • Когда другие потребители тогда требуют этого, они повторно используют тот же рабочий экземпляр.
  • Когда последний потребитель закрывает поток, он очищает свои ресурсы.

Это звучит достаточно просто. Тем не менее, последовательность запуска вызывает у меня проблемы, потому что она сама по себе асинхронная. Таким образом, предполагая этот интерфейс:

class Stream:
    async def start(self, *, timeout=DEFAULT_TIMEOUT):
        pass
    async def stop(self):
        pass

У меня есть следующие сценарии:

Сценарий 1 - исключение при запуске

  • Потребитель 1 просит работника начать.
  • Начинается последовательность запуска рабочего
  • Потребитель 2 просит работника начать.
  • Последовательность запуска работника вызывает исключение.
  • Оба потребителя должны видеть исключение как результат их вызова start ().

Сценарий 2 - частичная асинхронная отмена

  • Потребитель 1 просит работника начать.
  • Начинается последовательность запуска рабочего
  • Потребитель 2 просит работника начать.
  • Потребитель 1 отменяется.
  • Порядок запуска работника завершен.
  • Потребитель 2 должен увидеть успешное начало.

Сценарий 3 - полная асинхронная отмена

  • Потребитель 1 просит работника начать.
  • Начинается последовательность запуска рабочего
  • Потребитель 2 просит работника начать.
  • Потребитель 1 отменяется.
  • Потребитель 2 отменяется.
  • В результате последовательность запуска рабочего должна быть отменена.

Я изо всех сил стараюсь охватить все сценарии, не получая никаких условий гонки и беспорядка спагетти с обнаженными объектами Future или Event.


Вот попытка написания start(). Он полагается на _worker() установку asyncio.Event с именем self._worker_ready при завершении последовательности запуска:

async def start(self, timeout=None):
    assert not self.closing
    if not self._task:
        self._task = asyncio.ensure_future(self._worker())

    # Wait until worker is ready, has failed, or timeout triggers
    try:
        self._waiting_start += 1
        wait_ready = asyncio.ensure_future(self._worker_ready.wait())

        done, pending = await asyncio.wait(
            [self._task, wait_ready],
            return_when=asyncio.FIRST_COMPLETED, timeout=timeout
        )
    except asyncio.CancelledError:
        wait_ready.cancel()
        if self._waiting_start == 1:
            self.closing = True
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task    # let worker shutdown
        raise
    finally:
        self._waiting_start -= 1

    # worker failed to start - either throwing or timeout triggering
    if not self._worker_ready.is_set():
        self.closing = True
        self._task.cancel()
        wait_ready.cancel()

        try:
            await self._task        # let worker shutdown
        except asyncio.CancelledError:
            raise FeedTimeoutError('stream failed to start within %ss' % timeout)
        else:
            assert False, 'worker must propagate the exception'

То, что , кажется, работает, но это кажется слишком сложным, и его действительно сложно проверить: у работника много await точек, что приводит к комбинаторному взрыву, если я попытаюсь попробовать все возможные точки отмены и выполнение поручений.

Мне нужен лучший способ. Мне вот интересно:

  • Являются ли мои требования разумными?
  • Есть ли общий шаблон для этого?
  • Мой вопрос вызывает какой-то запах кода?

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018

С моими предыдущими тестами и предложениями по интеграции от @ user4815162342 я придумал решение для многократного использования:

st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())

Это правильно: task2 отменяется через 15 секунд. Отмена задач не влияет на test(), если они не отменены. В этом случае последнее отмененное задание будет вручную отменено test() и будет ожидать завершения обработки отмены.

Если передано сопрограмма, это запланировано, только когда первая задача начинает ждать.

Наконец, ожидание общей задачи после ее завершения просто немедленно дает результат (кажется очевидным, но первоначальная версия этого не сделала).

import asyncio
from contextlib import suppress


class SharedTask:
    __slots__ = ('_clients', '_task')

    def __init__(self, task):
        if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
            raise TypeError('task must be either a Future or a coroutine object')
        self._clients = 0
        self._task = task

    @property
    def started(self):
        return asyncio.isfuture(self._task)

    async def wait(self, *, timeout=None):
        self._task = asyncio.ensure_future(self._task)

        self._clients += 1
        try:
            return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
        except:
            self._clients -= 1
            if self._clients == 0 and not self._task.done():
                self._task.cancel()
                with suppress(asyncio.CancelledError):
                    await self._task
            raise

    def cancel(self):
        if asyncio.iscoroutine(self._task):
            self._task.close()
        elif asyncio.isfuture(self._task):
            self._task.cancel()

Повторное поднятие отмены исключения задачи (упомянутое в комментариях) является преднамеренным. Это позволяет этот шаблон:

async def my_task():
    try:
        await do_stuff()
    except asyncio.CancelledError as exc:
        await flush_some_stuff()     # might raise an exception
        raise exc

Клиенты могут отменить общую задачу и обработать исключение, которое может возникнуть в результате; оно будет работать одинаково независимо от того, заключен my_task в SharedTask или нет.

0 голосов
/ 10 ноября 2018

Ваши требования звучат разумно. Я бы попытался упростить start, заменив Event будущим (в данном случае заданием), используя его как для ожидания завершения запуска, так и для распространения исключений, возникающих в ходе его выполнения, если таковые имеются. Что-то вроде:

class Stream:
    async def start(self, *, timeout=DEFAULT_TIMEOUT):
        loop = asyncio.get_event_loop()
        if self._worker_startup_task is None:
            self._worker_startup_task = \
                loop.create_task(self._worker_startup())

        self._add_user()
        try:
            await asyncio.shield(asyncio.wait_for(
                self._worker_startup_task, timeout))
        except:
            self._rm_user()
            raise

    async def _worker_startup(self):
        loop = asyncio.get_event_loop()
        await asyncio.sleep(1)      # ...
        self._worker_task = loop.create_task(self._worker())

В этом коде рабочий запуск отделен от рабочей сопрограммы и также перемещен в отдельную задачу. Эта отдельная задача может ожидаться и устраняет необходимость в выделенном Event, но, что более важно, она позволяет сценариям 1 и 2 обрабатываться одним и тем же кодом. Даже если кто-то отменит первого потребителя, задача запуска рабочего не будет отменена - отмена просто означает, что ее ожидает на одного потребителя меньше.

Таким образом, в случае отмены потребителя await self._worker_startup_task будет отлично работать для других потребителей, тогда как в случае фактического исключения при запуске работника все остальные официанты увидят то же исключение, потому что задача будет выполнена.

Сценарий 3 должен работать автоматически, потому что мы всегда отменяем запуск, который больше не может наблюдаться потребителем, независимо от причины. Если потребители ушли из-за сбоя самого запуска, то self._worker_startup_task завершится (за исключением), и его отмена будет запрещена. Если это происходит потому, что все потребители сами были отменены в ожидании запуска, тогда self._worker_startup_task.cancel() отменит последовательность запуска, как того требует сценарий 3.

Остальная часть кода будет выглядеть (не проверено):

    def __init__(self):
        self._users = 0
        self._worker_startup = None

    def _add_user(self):
        self._users += 1

    def _rm_user(self):
        self._users -= 1
        if self._users:
            return
        self._worker_startup_task.cancel()
        self._worker_startup_task = None
        if self._worker_task is not None:
            self._worker_task.cancel()
            self._worker_task = None

    async def stop(self):
        self._rm_user()

    async def _worker(self):
        # actual worker...
        while True:
            await asyncio.sleep(1)
...