Преобразование функции Python с обратным вызовом в ожидаемую асинхронность - PullRequest
0 голосов
/ 01 января 2019

Я хочу использовать библиотеку PyAudio в асинхронном контексте, но основная точка входа для библиотеки имеет только API на основе обратного вызова:

    import pyaudio

    def callback(in_data, frame_count, time_info, status):
        # Do something with data

    pa = pyaudio.PyAudio()
    self.stream = self.pa.open(
        stream_callback=callback
    )

Как я надеюсь использовать еепримерно так:

pa = SOME_ASYNC_COROUTINE()
async def listen():
    async for block in pa:
        # Do something with block

Проблема в том, что я не уверен, как преобразовать этот синтаксис обратного вызова в будущее, которое завершается, когда срабатывает обратный вызов.В JavaScript я бы использовал promise.promisify(), но Python, похоже, не имеет ничего подобного.

Ответы [ 2 ]

0 голосов
/ 01 января 2019

Эквивалент promisify не будет работать для этого варианта использования по двум причинам:

  • Асинхронный API PyAudio не использует цикл событий asyncio - в документации указано, что обратный вызов вызываетсяиз фоновой темы.Это требует мер предосторожности для правильной связи с asyncio.
  • Обратный вызов не может быть смоделирован одним будущим, поскольку он вызывается несколько раз, тогда как будущее может иметь только один результат.Вместо этого он должен быть преобразован в асинхронный итератор, как показано в примере кода.

Вот одна из возможных реализаций:

async def make_iter():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def put(*args):
        loop.call_soon_threadsafe(queue.put_nowait, args)
    async def get():
        while True:
            yield queue.get()
    return get(), put

make_iter возвращает пара из,Возвращенные объекты содержат свойство, которое вызывает обратный вызов, заставляет итератор выдавать свое следующее значение (аргументы, переданные обратному вызову).Обратный вызов может быть вызван для вызова из произвольного потока, и поэтому его можно безопасно передать на pyaudio.open, в то время как асинхронный итератор должен быть передан async for в сопрограмме asyncio, которая будет приостановлена ​​при ожидании следующего значения:

async def main():
    stream_get, stream_put = make_iter()
    stream = pa.open(stream_callback=stream_put)
    stream.start_stream()
    async for in_data, frame_count, time_info, status in stream_get:
        # ...

asyncio.get_event_loop().run_until_complete(main())

Обратите внимание, что согласно документации обратный вызов также должен возвращать значащее значение, кортеж фреймов и логический флаг.Это может быть включено в проект путем изменения функции fill, чтобы также получать данные со стороны асинхронного режима.Реализация не включена, потому что она может не иметь большого смысла без понимания предметной области.

0 голосов
/ 01 января 2019

Возможно, вы захотите использовать Future

class asyncio.Future (*, loop = None) ¶

Future представляет возможный результатасинхронная операция.Не потокобезопасен.

Будущее - это ожидаемый объект.Сопрограммы могут ожидать объекты Future до тех пор, пока им не будет установлен результат или исключение, или пока они не будут отменены.

Обычно фьючерсы используются для включения низкоуровневого кода обратного вызова (например,в протоколах, реализованных с использованием асинхронных переносов) для взаимодействия с высокоуровневым кодом асинхронного / ожидающего выполнения.

Практическое правило состоит в том, чтобы никогда не выставлять объекты Future в API, ориентированных на пользователя, и рекомендуемый способ создания объекта Future:вызвать loop.create_future ().Таким образом, альтернативные реализации цикла событий могут внедрять свои собственные оптимизированные реализации объекта Future.

Глупый пример:

def my_func(loop):
    fut = loop.create_future()
    pa.open(
        stream_callback=lambda *a, **kw: fut.set_result([a, kw])
    )
    return fut


async def main(loop):
    result = await my_func(loop)  # returns a list with args and kwargs 

Я предполагаю, что pa.open выполняется в потоке илиподпроцесс.Если нет, вам также может понадобиться заключить вызов в open с помощью asyncio.loop.run_in_executor

...