Как перебрать большой список без блокировки цикла событий - PullRequest
3 голосов
/ 23 апреля 2019

У меня есть скрипт на Python с запущенным циклом событий asyncio, я хочу знать, как перебирать большой список, не блокируя цикл событий. Таким образом, поддерживая цикл в рабочем состоянии.

Я пытался создать пользовательский класс с __aiter__ и __anext__, который не работал, я также пытался создать async function, который дает результат, но он все еще блокирует.

В настоящее время:

for index, item in enumerate(list_with_thousands_of_items):
    # do something

Пользовательский класс, который я пробовал:

class Aiter:
    def __init__(self, iterable):
        self.iter_ = iter(iterable)

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            object = next(self.iter_)
        except StopIteration:
            raise StopAsyncIteration
        return object

Но это всегда приводит к

TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

Я создал async function, который работает, но блокирует цикл обработки событий:

async def async_enumerate(iterable, start:int=0):
    for idx, i in enumerate(iterable, start):
        yield idx, i

Ответы [ 2 ]

4 голосов
/ 24 апреля 2019

Как указывал @deceze, вы можете использовать await asyncio.sleep(0), чтобы явно передать управление в цикл обработки событий. Однако у этого подхода есть проблемы.

Предположительно, список довольно большой, поэтому вам потребовались специальные меры для разблокировки цикла событий. Но если список настолько велик, то принудительное выполнение каждой итерации цикла для цикла обработки событий значительно замедлит его . Конечно, вы можете смягчить это, добавив счетчик и ожидая только когда i%10 == 0 или i%100 == 0 и т. Д. Но тогда вам придется принимать произвольные решения (угадать) относительно того, как часто следует отказываться от контроля. Если вы уступаете слишком часто, вы замедляете свою работу. Если вы уступаете слишком редко, вы делаете цикл событий не отвечающим.

Этого можно избежать, используя run_in_executor, как предлагает РафаэльДера. run_in_executor принимает блокирующую функцию и выгружает ее выполнение в пул потоков. Он немедленно возвращает будущее, которое может быть await отредактировано в асинхронном режиме, и чей результат, если он будет доступен, будет возвращаемым значением функции блокировки. (Если повышается функция блокировки, исключение будет распространяться вместо этого.) Такие await будут приостанавливать сопрограмму до тех пор, пока функция не вернется или не повысится в своем потоке, позволяя циклу событий оставаться полностью функциональным в это время. Поскольку блокирующая функция и цикл обработки событий выполняются в отдельных потоках, функции не нужно ничего делать, чтобы позволить работать событию - они работают независимо. Даже GIL здесь не проблема, потому что GIL обеспечивает передачу управления между потоками.

С run_in_executor ваш код может выглядеть так:

def process_the_list():
    for index, item in enumerate(list_with_thousands_of_items):
        # do something

loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_the_list)
3 голосов
/ 23 апреля 2019

asyncio - это кооператив многозадачность.Совместная часть проистекает из того факта, что ваша функция должна возвращать выполнение обратно в цикл событий, чтобы позволить другим вещам выполняться.Если вы не await что-то (или не завершаете свою функцию), вы запутываете цикл обработки событий.

Вы можете просто await какое-то noop-событие, вероятно, наиболее подходящим является await asyncio.sleep(0).Это гарантирует, что ваша задача будет возобновлена ​​как можно скорее, но также позволит планировать другие задачи.

...