Запуск и ожидание асинхронной функции из синхронной с использованием Python asyncio - PullRequest
2 голосов
/ 13 марта 2019

В моем коде у меня есть класс со свойствами, которые иногда должны запускать асинхронный код. Иногда мне нужно получить доступ к свойству из асинхронной функции, иногда из синхронной - поэтому я не хочу, чтобы мои свойства были асинхронными. Кроме того, у меня сложилось впечатление, что асинхронные свойства вообще - это запах кода. Поправь меня, если я ошибаюсь.

У меня проблема с выполнением асинхронного метода из свойства синхронного и блокированием дальнейшего выполнения до завершения асинхронного метода.

Вот пример кода:

import asyncio


async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')


def synchronous_property():
    print('entering synchronous_property')
    loop = asyncio.get_event_loop()
    try:
        # this will raise an exception, so I catch it and ignore
        loop.run_until_complete(asynchronous())
    except RuntimeError:
        pass
    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())

Его выход:

entering main
entering synchronous_property
exiting synchronous_property
exiting main
entering asynchronous
exiting asynchronous

Во-первых, захват RuntimeError кажется неправильным, но если я этого не сделаю, я получу исключение RuntimeError: This event loop is already running.

Во-вторых, функция asynchronous() выполняется последней, после завершения синхронного. Я хочу выполнить некоторую обработку набора данных асинхронным методом, поэтому мне нужно дождаться его завершения. Если я добавлю await asyncio.sleep(0) после вызова synchronous_property(), он позвонит asynchronous() до окончания main(), но это мне не поможет. Мне нужно запустить asynchronous() до synchronous_property() финиша.

Что мне не хватает? Я использую Python 3.7.

Ответы [ 3 ]

1 голос
/ 14 марта 2019

Asyncio действительно настаивает на том, чтобы не допускать вложенные циклы, по проекту . Однако вы всегда можете запустить другой цикл обработки событий в другом потоке. Вот вариант, который использует пул потоков, чтобы избежать необходимости каждый раз создавать новый поток:

import asyncio, concurrent.futures

async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')

pool = concurrent.futures.ThreadPoolExecutor()

def synchronous_property():
    print('entering synchronous_property')
    result = pool.submit(asyncio.run, asynchronous()).result()
    print('exiting synchronous_property', result)

async def asynchronous():
    print('entering asynchronous')
    await asyncio.sleep(1)
    print('exiting asynchronous')
    return 42

asyncio.run(main())

Этот код создает новый цикл событий на каждой границе sync-> async, поэтому не ожидайте высокой производительности, если вы делаете это много. Это можно улучшить, создав только один цикл обработки событий для каждого потока с помощью asyncio.new_event_loop и поместив его в локальную переменную потока.

0 голосов
/ 24 июля 2019

Похоже, проблема с вопросом, как указано.Повторяющийся вопрос: как установить связь между потоком (не содержащим асинхронных процессов и, следовательно, считающимся синхронизированным) и асинхронным процессом (работающим в некотором цикле событий).Один из подходов заключается в использовании двух очередей синхронизации.Процесс синхронизации помещает свои запрос / параметры в QtoAsync и ожидает QtoSync.Асинхронный процесс читает QtoAsync БЕЗ ожидания, и если он находит запрос / параметры, выполняет запрос и помещает результат в QtoSync.

import queue
QtoAsync = queue.Queue()
QtoSync = queue.Queue()
...

async def asyncProc():
    while True:
        try:
            data=QtoAsync.get_nowait()
            result = await <the async that you wish to execute>
            QtoAsync.put(result) #This can block if queue is full. you can use put_nowait and handle the exception.
        except queue.Empty:
            await asyncio.sleep(0.001) #put a nominal delay forcing this to wait in event loop
....
#start the sync process in a different thread here..
asyncio.run(main()) #main invokes the async tasks including the asyncProc

The sync thread puts it request to async using:
req = <the async that you wish to execute>
QtoAsync.put(req)
result = QtoSync.get()

Это должно работать.

Проблема с заданным вопросом: 1. Когда асинхронные процессы запускаются с асинхронными (или аналогичными) блоками выполнения до тех пор, пока не завершатся асинхронные процессы.,Отдельный поток синхронизации должен быть запущен подробно до вызова asyncio.run 2. В общем случае асинхронные процессы зависят от других асинхронных процессов в этом цикле.Поэтому вызов асинхронного процесса из другого потока не разрешен напрямую.Взаимодействие должно быть с циклом событий, и использование двух очередей - один из подходов.

0 голосов
/ 14 марта 2019

Я хочу сделать асинхронный вызов для выполнения из синхронизации и заблокировать его выполнение

Просто сделайте синхронизацию func асинхронной и дождитесь асинхронной функции.Асинхронные функции похожи на обычные функции, и вы можете поместить в них любой код, какой захотите.Если у вас все еще есть проблема, измените свой вопрос, используя реальный код, который вы пытаетесь запустить.

import asyncio


async def main():
    print('entering main')
    await synchronous_property()
    print('exiting main')


async def synchronous_property():
    print('entering synchronous_property')

    await asynchronous()

    # Do whatever sync stuff you want who cares

    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())
...