Как перебрать асинхронный итератор с таймаутом? - PullRequest
0 голосов
/ 08 мая 2018

Я думаю, что это проще понять с точки зрения кода:

async for item in timeout(something(), timeout=60, sentinel=None):
    if item is not None:
        await do_something_useful(item)
    await refresh()

Я хочу, чтобы сопрограмма выполнялась refresh как минимум каждые 60 секунд.

Ответы [ 6 ]

0 голосов
/ 12 октября 2018

В вашем вопросе отсутствует пара деталей, но если предположить, что something() является асинхронным итератором или генератором, и вы хотите, чтобы item было sentinel каждый раз, когда something не давало значение в течение тайм-аута, вот реализация timeout():

import asyncio
from typing import *

T = TypeVar('T')

# async generator, needs python 3.6
async def timeout(it: AsyncIterator[T], timeo: float, sentinel: T) -> AsyncGenerator[T, None]:
    try:
        nxt = asyncio.ensure_future(it.__anext__())
        while True:
            try:
                yield await asyncio.wait_for(asyncio.shield(nxt), timeo)
                nxt = asyncio.ensure_future(it.__anext__())
            except asyncio.TimeoutError:
                yield sentinel
    except StopAsyncIteration:
        pass
    finally:
        nxt.cancel()  # in case we're getting cancelled our self

тест:

async def something():
    yield 1
    await asyncio.sleep(1.1)
    yield 2
    await asyncio.sleep(2.1)
    yield 3


async def test():
    expect = [1, None, 2, None, None, 3]
    async for item in timeout(something(), 1, None):
        print("Check", item)
        assert item == expect.pop(0)

asyncio.get_event_loop().run_until_complete(test())

По истечении wait_for() задание будет отменено. Поэтому нам нужно обернуть it.__anext__() в задачу и затем защитить ее, чтобы иметь возможность возобновить итератор.

0 голосов
/ 21 августа 2018

Мне нужно было сделать что-то подобное, чтобы создать веб-сокет (также асинхронный итератор), который истекает, если по истечении определенного времени он не получает сообщение. Я остановился на следующем:

socket_iter = socket.__aiter__()
try:
    while True:
        message = await asyncio.wait_for(
            socket_iter.__anext__(),
            timeout=10
        )
except asyncio.futures.TimeoutError:
    # streaming is completed
    pass
0 голосов
/ 09 мая 2018

Я хочу, чтобы сопрограмма выполнялась refresh как минимум каждые 60 секунд.

Если вам нужно выполнять refresh каждые 60 секунд, независимо от того, что происходит с do_something_useful, вы можете организовать это с отдельной сопрограммой:

import time

async def my_loop():
    # ensure refresh() is invoked at least once in 60 seconds
    done = False
    async def repeat_refresh():
        last_run = time.time()
        while not done:
            await refresh()
            now = time.time()
            await asyncio.sleep(max(60 - (now - last_run), 0))
            last_run = now
    # start repeat_refresh "in the background"
    refresh_task = asyncio.get_event_loop().create_task(repeat_refresh())

    try:
        async for item in something():
            if item is not None:
                await do_something_useful(item)
            await refresh()
    finally:
        done = True
0 голосов
/ 09 мая 2018

Ответ на ваш вопрос может отличаться в зависимости от характера функции refresh. Если это очень короткая функция, ее можно свободно вызывать внутри сопрограммы. Но если это блокирующая функция (из-за сети или процессора), она должна быть запущена в в executor , чтобы избежать зависания цикла событий asyncio.

Код, приведенный ниже, показывает пример для первого случая, изменить его на запуск refresh в executor несложно.

Второе, что следует уточнить, это природа асинхронного итератора. Насколько я понимаю, вы используете его, чтобы получить результат из something или None, если истекло время ожидания.

Если я правильно понимаю логику, ваш код может быть написан более четко (аналогично не асинхронному стилю, который разрешен для asyncio), используя async_timeout менеджер контекста и вообще без использования асинхронного итератора:

import asyncio
from async_timeout import timeout


async def main():
    while True:
        try:
            async with timeout(60):
                res = await something()
                await do_something_useful(item)
        except asyncio.TimeoutError:
            pass
        finally:
            refresh()
0 голосов
/ 09 мая 2018

AsyncTimedIterable может быть реализацией timeout() в вашем коде:

class _AsyncTimedIterator:

    __slots__ = ('_iterator', '_timeout', '_sentinel')

    def __init__(self, iterable, timeout, sentinel):
        self._iterator = iterable.__aiter__()
        self._timeout = timeout
        self._sentinel = sentinel

    async def __anext__(self):
        try:
            return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
        except asyncio.TimeoutError:
            return self._sentinel


class AsyncTimedIterable:

    __slots__ = ('_factory', )

    def __init__(self, iterable, timeout=None, sentinel=None):
        self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)

    def __aiter__(self):
        return self._factory()

(оригинальный ответ)

Или используйте этот класс для замены timeout() функции:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=None, sentinel=None):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    return await asyncio.wait_for(self._iterator.__anext__(),
                                                  timeout)
                except asyncio.TimeoutError:
                    return sentinel

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()
0 голосов
/ 09 мая 2018

Простой подход заключается в использовании asyncio.Queue и разделении кода на две сопрограммы:

queue = asyncio.Queue()
async for item in something():
    await queue.put(item)

В другом сопрограмме:

while True:
    try:
        item = await asyncio.wait_for(queue.get(), 60)
    except asyncio.TimeoutError:
        pass
    else:
        if item is None:
            break  # use None or whatever suits you to gracefully exit
        await do_something_useful(item)
    refresh()

Обратите внимание, что очередь увеличится, если обработчик do_something_useful() будет медленнее, чем something(), создающий элементы. Вы можете установить maxsize в очереди, чтобы ограничить размер буфера.

...