Как обернуть asyncio итератором - PullRequest
4 голосов
/ 13 марта 2019

У меня есть следующий упрощенный код:

async def asynchronous_function(*args, **kwds):
    statement = await prepare(query)
    async with conn.transaction():
        async for record in statement.cursor():
            ??? yield record ???

...

class Foo:

    def __iter__(self):
        records = ??? asynchronous_function ???
        yield from records

...

x = Foo()
for record in x:
    ...

Я не знаю, как заполнить ??? выше. Я хочу получить данные записи, но на самом деле не совсем понятно, как обернуть асинхронный код.

Ответы [ 2 ]

3 голосов
/ 14 марта 2019

Хотя верно то, что asyncio предназначен для использования во всех областях, иногда просто невозможно сразу преобразовать большой кусок программного обеспечения (со всеми его зависимостями) в асинхронный.К счастью, есть способы объединить устаревший синхронный код с недавно написанными асинхронными частями.Простой способ сделать это - запустить цикл обработки событий в выделенном потоке и использовать asyncio.run_coroutine_threadsafe для отправки ему задач.

С помощью этих низкоуровневых инструментов вы можете написатьуниверсальный адаптер для преобразования любого асинхронного итератора в синхронный.Например:

import asyncio, threading, queue

# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

def wrap_async_iter(ait):
    """Wrap an asynchronous iterator into a synchronous one"""
    q = queue.Queue()
    _END = object()

    def yield_queue_items():
        while True:
            next_item = q.get()
            if next_item is _END:
                break
            yield next_item
        # After observing _END we know the aiter_to_queue coroutine has
        # completed.  Invoke result() for side effect - if an exception
        # was raised by the async iterator, it will be propagated here.
        async_result.result()

    async def aiter_to_queue():
        try:
            async for item in ait:
                q.put(item)
        finally:
            q.put(_END)

    async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
    return yield_queue_items()

Тогда вашему коду нужно просто вызвать wrap_async_iter, чтобы обернуть асинхронный итератор в синхронизирующий:

async def mock_records():
    for i in range(3):
        yield i
        await asyncio.sleep(1)

for record in wrap_async_iter(mock_records()):
    print(record)

В вашем случае Foo.__iter__ будет использовать yield from wrap_async_iter(asynchronous_function(...)).

1 голос
/ 13 марта 2019

Если вы хотите получить все записи из асинхронного генератора, вы можете использовать async for или, для краткости, асинхронные понимания :

async def asynchronous_function(*args, **kwds):
    # ...
    yield record


async def aget_records():
    records = [
        record 
        async for record 
        in asynchronous_function()
    ]
    return records

Если вы хотите получить результат от асинхронной функции синхронно (т.е. блокирование), вы можете просто запустить эту функцию в цикле асинхронности :

def get_records():
    records = asyncio.run(aget_records())
    return records

Обратите внимание, однако, что после запуска некоторой сопрограммы в цикле событий вы теряете возможность запускать эту сопрограмму одновременно (то есть параллельно) с другими сопрограммами и, таким образом, получаете все связанные с этим преимущества.

Как уже указывалось в комментариях Винсент , asyncio - это не волшебная палочка, которая ускоряет код, а инструмент, который иногда можно использовать для одновременного выполнения различных задач ввода-вывода с низкими издержками.

Возможно, вам будет интересно прочитать этот ответ , чтобы увидеть основную идею, лежащую в основе asyncio.

...