Дублирование кода для синхронных и асинхронных реализаций - PullRequest
16 голосов
/ 14 марта 2019

При реализации классов, которые используются как в синхронных, так и в асинхронных приложениях, я обнаруживаю, что поддерживаю практически идентичный код для обоих вариантов использования.

В качестве примера рассмотрим:

from time import sleep
import asyncio


class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in range(to):
            yield i
            sleep(self.delay)


def func(ue):
    for value in ue.ticker(5):
        print(value)


async def a_func(ue):
    async for value in ue.a_ticker(5):
        print(value)


def main():
    ue = UselessExample(1)
    func(ue)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(a_func(ue))


if __name__ == '__main__':
    main()

В этом примере это не так уж и плохо, ticker методы UselessExample легко поддерживать в тандеме, но вы можете себе представить, что обработка исключений и более сложные функциональные возможности могут быстро расширить метод и сделать его более проблема, даже если оба метода могут оставаться практически идентичными (только замена определенных элементов их асинхронными аналогами).

Предполагая, что нет существенной разницы, которая стоит того, чтобы оба были полностью реализованы, каков наилучший (и самый Pythonic) способ поддерживать такой класс и избегать ненужного дублирования?

Ответы [ 2 ]

11 голосов
/ 26 марта 2019

Нет единого пути для создания базы кодов на основе asyncio-сопрограмм, пригодной для использования из традиционных синхронных кодовых баз. Вы должны сделать выбор для каждого кодового пути.

Выберите и выберите из серии инструментов:

Синхронные версии с использованием async.run()

Обеспечьте синхронные обертки вокруг сопрограмм, которые блокируются, пока сопрограмма не завершится.

Даже функция асинхронного генератора , такая как ticker(), может обрабатываться таким образом в цикле:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        agen = self.a_ticker(to)
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return

Эти синхронные оболочки могут быть созданы с помощью вспомогательных функций:

from functools import wraps

def sync_agen_method(agen_method):
    @wraps(agen_method)
    def wrapper(self, *args, **kwargs):
        agen = agen_method(self, *args, **kwargs)   
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper

, затем просто используйте ticker = sync_agen_method(a_ticker) в определении класса.

Методы прямой процедуры сопрограммы (не сопрограммы генератора) могут быть заключены в:

def sync_method(async_method):
    @wraps(async_method)
    def wrapper(self, *args, **kwargs):
        return async.run(async_method(self, *args, **kwargs))
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper

Вывести общие компоненты

Реорганизовать синхронные части в генераторы, контекстные менеджеры, служебные функции и т. Д.

Для вашего конкретного примера вытащение цикла for в отдельный генератор сведет к минимуму дублирующийся код для того, как спят две версии:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    def _ticker_gen(self, to):
        yield from range(to)

    async def a_ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            sleep(self.delay)

Хотя это не имеет большого значения здесь , оно может работать в других контекстах.

Абстрактное преобразование синтаксического дерева

Используйте переписывание AST и карту для преобразования сопрограмм в синхронный код. Это может быть довольно хрупким, если вы не будете внимательны к тому, как распознавать такие служебные функции, как asyncio.sleep() против time.sleep():

import inspect
import ast
import copy
import textwrap
import time

asynciomap = {
    # asyncio function to (additional globals, replacement source) tuples
    "sleep": ({"time": time}, "time.sleep")
}


class AsyncToSync(ast.NodeTransformer):
    def __init__(self):
        self.globals = {}

    def visit_AsyncFunctionDef(self, node):
        return ast.copy_location(
            ast.FunctionDef(
                node.name,
                self.visit(node.args),
                [self.visit(stmt) for stmt in node.body],
                [self.visit(stmt) for stmt in node.decorator_list],
                node.returns and ast.visit(node.returns),
            ),
            node,
        )

    def visit_Await(self, node):
        return self.visit(node.value)

    def visit_Attribute(self, node):
        if (
            isinstance(node.value, ast.Name)
            and isinstance(node.value.ctx, ast.Load)
            and node.value.id == "asyncio"
            and node.attr in asynciomap
        ):
            g, replacement = asynciomap[node.attr]
            self.globals.update(g)
            return ast.copy_location(
                ast.parse(replacement, mode="eval").body,
                node
            )
        return node


def transform_sync(f):
    filename = inspect.getfile(f)
    lines, lineno = inspect.getsourcelines(f)
    ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
    ast.increment_lineno(ast_tree, lineno - 1)

    transformer = AsyncToSync()
    transformer.visit(ast_tree)
    tranformed_globals = {**f.__globals__, **transformer.globals}
    exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
    return tranformed_globals[f.__name__]

Хотя вышеприведенное, вероятно, далеко не достаточно для того, чтобы соответствовать всем потребностям, и преобразование AST-деревьев может устрашать, приведенное выше позволит вам поддерживать только асинхронную версию и напрямую сопоставлять эту версию с синхронными версиями:

>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../example.py", line 32, in main
    func(ue)
  File "/.../example.py", line 21, in func
    for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4
0 голосов
/ 01 апреля 2019

async/await заразна по своему замыслу.

Примите, что у вашего кода будут разные пользователи - синхронные и асинхронные, и что у этих пользователей будут разные требования, что со временем реализации будут расходиться.

Публикация отдельных библиотек

Например, сравните aiohttp с aiohttp-requests и requests.

Аналогично, сравните asyncpg с psycopg2.

Как туда добраться

Opt1. (легкая) реализация клонов, позволяющая им расходиться.

Opt2. (разумный) частичный рефакторинг, например, Библиотека асинхронных файлов зависит от библиотеки импорта и импортирует ее.

Opt3. (радикально) создать «чистую» библиотеку, которую можно использовать как в синхронизации, так и в асинхронной программе. Например, см. https://github.com/python-hyper/hyper-h2.

С другой стороны, тестирование проще и тщательнее. Подумайте, насколько трудно (или невозможно) заставить тестовую среду оценить все возможные параллельные порядки выполнения в асинхронной программе. Чистая библиотека не нуждается в этом:)

С другой стороны, этот стиль программирования требует иного мышления, не всегда прост и может быть неоптимальным. Например, вместо await socket.read(2**20) вы бы написали for event in fsm.push(data): ... и полагались на то, что пользователь вашей библиотеки предоставит вам данные большими порциями.

Для контекста см. Аргумент backpressure в https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/

...