Использование асинхронных генераторов и asyncio.as_completed - PullRequest
1 голос
/ 01 октября 2019

У меня есть некоторый код, который я использую, чтобы удалить URL-адрес, проанализировать информацию и затем поместить ее в БД с помощью SQLAlchemy. Я пытаюсь сделать это асинхронно, ограничивая максимальное количество одновременных запросов.

Вот мой код:

async def get_url(aiohttp_session, url1, url2):
    async with session.get(url1) as r_url1:
       if r_url1.status == 200:
          async with session.get(url2) as r_url2:
             if r_url2.status == 200:
                return await r_url1.json(), await r_url2.json()

async def url_generator(formatted_start_date, formatted_end_date, machine_id, interval):
    interval_start = formatted_start_date
    interval_end = formatted_start_date + interval

    while interval_end <= formatted_end_date:
        yield (f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}",
               f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}&groupby=Job"
               )
        interval_start += interval
        interval_end += interval

async def parse(database, url1_json, url2_json):
    """ Do some parsing and save it using credentials stored in the database object """


def main(database, formatted_start_date, formatted_end_date, machine_id, interval):
    async for url1_json, url2_json in asyncio.as_completed(url_generator(formatted_start_date, formatted_end_date, machine_id, interval)):
         parse(database, url1_json, url2_json)

Я получаю сообщение об ошибке yield from should be used as context manager expression.

Я пытался просмотреть документацию здесь а также примитивы синхронизации, и я все еще не понимаю, где я ошибся и как мне следует создавать задачи из моего генератора.

1 Ответ

2 голосов
/ 02 октября 2019

Есть несколько проблем с размещенным кодом:

  • Вы пытаетесь использовать as_completed в качестве асинхронного итератора, перебирая его результаты с помощью async for. Тем не менее, as_completed не возвращает асинхронный итератор (по крайней мере, , но не ) и должен быть повторен с обычным for, и ожидать каждого полученного объекта в явном виде, как показано в документации .

  • Вы передаете асинхронный итератор в as_completed, в то время как он принимает обычный контейнер или (обычный) итерируемый.

  • Вы используете async for в функции, не определенной с async def, что должно быть синтаксической ошибкой. Кроме того, parse() определяется как сопрограмма, и вы ее не ожидаете.

Хорошая новость заключается в том, что, поскольку url_generator уже является генератором, вам не нужноas_completed вообще, вы должны иметь возможность просто итерировать по нему:

async def main(database, formatted_start_date, formatted_end_date,
               machine_id, interval):
    async for url1_json, url2_json in url_generator(
            formatted_start_date, formatted_end_date,
            machine_id, interval)):
        await parse(database, url1_json, url2_json)

Обратите внимание, однако, что async for не будет автоматически распараллеливать итерацию, она просто позволит другим сопрограммам запускаться впараллельно с сопрограммой, которая повторяется. Чтобы распараллелить итерацию, вам нужно вызвать create_task, чтобы отправить задачи параллельно, и использовать asyncio.Semaphore, чтобы ограничить количество параллельных задач. Например:

async def parse(database, url1_json, url2_json, limit):
    # async with applied to a semaphore ensures that no more than N
    # coroutines that use the same semaphore enter the "with" block
    # in parallel
    async with limit:
        ... code goes here ...

async def main(database, formatted_start_date, formatted_end_date,
               machine_id, interval):
    limit = asyncio.Semaphore(10)

    # create all coroutines in advance using create_task
    # and run them in parallel, relying on the semaphore
    # limit the number of simultaneous requests
    tasks = []
    async for url1_json, url2_json in url_generator(
            formatted_start_date, formatted_end_date,
            machine_id, interval)):
        # this create_task just creates the task - it will
        # start running when we return to the event loop
        tasks.append(asyncio.create_task(parse(database, url1_json, url2_json, limit))

    # suspend to the event loop, resuming this coroutine only after
    # all the tasks have finished (or any of them raises)
    await asyncio.gather(*tasks)

Обратите внимание, что url_generator не должен быть асинхронным, потому что ему ничего не нужно await. Вы можете определить его с помощью def и повторить его с помощью for.

...