Как принудительно закрыть генератор asyn c? - PullRequest
5 голосов
/ 14 февраля 2020

Допустим, у меня есть асинхронный генератор c, подобный этому:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            event = await queue.get()
            yield event
        else:
            return

Я использую его так:

published_events = event_publisher(connection, queue)
async for event in published_events:
    # do event processing here

Он работает очень хорошо, однако, когда соединение установлено отключено и не опубликовано никакого нового события, async for будет просто ждать вечно, поэтому в идеале я хотел бы принудительно закрыть генератор следующим образом:

if connection.is_disconnected():
    await published_events.aclose()

Но я получаю следующую ошибку:

RuntimeError: aclose(): asynchronous generator is already running

Есть ли способ остановить обработку уже работающего генератора?

Ответы [ 2 ]

3 голосов
/ 15 февраля 2020

Кажется, это связано с этой проблемой . Заметно:

Как показано в https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c, закрытие синхронного генератора во время его итерации является ошибкой.

...

В 3.8 вызов aclose () может привести к sh с RuntimeError. Больше невозможно надежно отменить работающий асинхронный генератор.

Что ж, поскольку мы не можем отменить работающий асинхронный генератор, давайте попробуем отменить его выполняется.

import asyncio
from contextlib import suppress


async def cancel_gen(agen):
    task = asyncio.create_task(agen.__anext__())
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task
    await agen.aclose()  # probably a good idea, 
                         # but if you'll be getting errors, try to comment this line

...

if connection.is_disconnected():
    await cancel_gen(published_events)

Невозможно проверить, будет ли это работать, поскольку вы не предоставили воспроизводимый пример.

1 голос
/ 14 февраля 2020

Вы можете использовать тайм-аут в очереди, поэтому is_connected() регулярно опрашивается, если нет элемента для всплывающего сообщения:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            try:
                event = await asyncio.wait_for(queue.get(), timeout=10.0)
            except asyncio.TimeoutError:
                continue
            yield event
        else:
            return

В качестве альтернативы, можно использовать Queue.get_nowait().

...