as_completed
является специфическим в том смысле, что он не дает фьючерсы, такие как asyncio.wait
, и их результаты, такие как asyncio.gather
.Вместо этого он выдает сопрограммы, которые вам нужно ждать (каким угодно образом), чтобы получить результаты в порядке завершения.Он не может выдать фьючерсы, которые вы передаете ему, потому что в этот момент он еще не знает, какой из переданных фьючерсов завершит следующий.
Вы можете связать произвольные данные, обернув задачу в другое будущее, результатом которого являетсяобъект задачи (к которому вы прикрепили свои данные).По сути это эквивалентно тому, что делает C # код , только без статической типизации.Принимая настройку из этого ответа , работающий пример выглядит следующим образом:
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
wrappers = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
# Wrap the task in a future that completes when the
# task does, but whose result is the task object itself.
wrapper = loop.create_future()
task.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for x in asyncio.as_completed(wrappers):
# yield completed tasks
yield loop.run_until_complete(x)
for task in ordinary_generator():
print(task.result(), task.idx)
Другой вариант, который я бы порекомендовал, состоит в замене итерации по as_completed
на цикл, которыйзвонки asyncio.wait(return_when=FIRST_COMPLETED)
.Это также обеспечит фьючерсы по мере их завершения, но без необходимости дополнительной обертки, что приведет к немного более идиоматическому асинхронному коду.Мы вызываем ensure_future
для каждой сопрограммы, чтобы преобразовать ее в будущее, прикрепить к ней данные и только затем передать ее asyncio.wait()
.Поскольку wait
возвращает те же самые фьючерсы, прикрепленные данные находятся на них.
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
pending.append(task)
while pending:
done, pending = loop.run_until_complete(asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED))
for task in done:
yield task