Есть несколько проблем с размещенным кодом:
Вы пытаетесь использовать as_completed
в качестве асинхронного итератора, перебирая его результаты с помощью async for
. Тем не менее, as_completed
не возвращает асинхронный итератор (по крайней мере, , но не ) и должен быть повторен с обычным for
, и ожидать каждого полученного объекта в явном виде, как показано в документации .
Вы передаете асинхронный итератор в as_completed
, в то время как он принимает обычный контейнер или (обычный) итерируемый.
Вы используете async for
в функции, не определенной с async def
, что должно быть синтаксической ошибкой. Кроме того, parse()
определяется как сопрограмма, и вы ее не ожидаете.
Хорошая новость заключается в том, что, поскольку url_generator
уже является генератором, вам не нужноas_completed
вообще, вы должны иметь возможность просто итерировать по нему:
async def main(database, formatted_start_date, formatted_end_date,
machine_id, interval):
async for url1_json, url2_json in url_generator(
formatted_start_date, formatted_end_date,
machine_id, interval)):
await parse(database, url1_json, url2_json)
Обратите внимание, однако, что async for
не будет автоматически распараллеливать итерацию, она просто позволит другим сопрограммам запускаться впараллельно с сопрограммой, которая повторяется. Чтобы распараллелить итерацию, вам нужно вызвать create_task
, чтобы отправить задачи параллельно, и использовать asyncio.Semaphore
, чтобы ограничить количество параллельных задач. Например:
async def parse(database, url1_json, url2_json, limit):
# async with applied to a semaphore ensures that no more than N
# coroutines that use the same semaphore enter the "with" block
# in parallel
async with limit:
... code goes here ...
async def main(database, formatted_start_date, formatted_end_date,
machine_id, interval):
limit = asyncio.Semaphore(10)
# create all coroutines in advance using create_task
# and run them in parallel, relying on the semaphore
# limit the number of simultaneous requests
tasks = []
async for url1_json, url2_json in url_generator(
formatted_start_date, formatted_end_date,
machine_id, interval)):
# this create_task just creates the task - it will
# start running when we return to the event loop
tasks.append(asyncio.create_task(parse(database, url1_json, url2_json, limit))
# suspend to the event loop, resuming this coroutine only after
# all the tasks have finished (or any of them raises)
await asyncio.gather(*tasks)
Обратите внимание, что url_generator
не должен быть асинхронным, потому что ему ничего не нужно await
. Вы можете определить его с помощью def
и повторить его с помощью for
.