Как правильно использовать asyncio.FIRST_COMPLETED - PullRequest
0 голосов
/ 20 февраля 2019

Проблема в том, что я получаю ошибку RuntimeError: Event loop is closed, даже когда использую return_when=asyncio.FIRST_COMPLETED внутри await asyncio.wait().

Мой код:

async def task_manager():
    tasks = [grab_proxy() for _ in range(10)]
    finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    for x in finished:
        result = x.result()

        if result:
            return result


def get_proxy_loop():
    loop = asyncio.new_event_loop()

    proxy = loop.run_until_complete(task_manager())

    loop.close()
    return proxy


if __name__ == '__main__':
    p = get_proxy_loop()

    print(type(p))
    print(p)

Ожидаемое поведение:

return_when=asyncio.FIRST_COMPLETED должен убить все оставшиеся задачи, когда первый результат возвращается «под капот».

Но на самом деле после первого результата все еще остаются незавершенные задачи.И после того, как я закрываю цикл в get_proxy_loop() и получаю доступ к результату внутри __main__, эти оставшиеся задачи поднимают RuntimeError: Event loop is closed.

Вывод на консоль:

<class 'str'>
78.32.35.21:55075
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() running at /home/pata/PycharmProjects/accs_farm/accs_farm/proxy_grabber.py:187> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8798>()]>>
Exception ignored in: <coroutine object grab_proxy at 0x7fc5150aae60>
Traceback (most recent call last):
  File "/home/pata/proxy_grabber.py", line 187, in grab_proxy
    proxy = await async_get_proxy()
  File "/home/pata/proxy_grabber.py", line 138, in async_get_proxy
    async with session.get(provider_url, timeout=5, params=params) as r:
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 855, in __aenter__
    self._resp = await self._coro
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 396, in _request
    conn.close()
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 110, in close
    self._key, self._protocol, should_close=True)
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 547, in _release
Event loop is closed
    transport = protocol.close()
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client_proto.py", line 54, in close
    transport.close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 580, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 366, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
...
...
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /home/pata/proxy_grabber.py:183> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(11)(), <TaskWakeupMethWrapper object at 0x7fc514d15e28>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /proxy_grabber.py:183> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8588>()]>>
Event loop is closed
Process finished with exit code 0

1 Ответ

0 голосов
/ 20 февраля 2019

Сопрограмма asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED) возвращается, когда хотя бы одна из задач завершена.Другие задачи все еще могут быть активными. не работа asyncio.wait() отменить эти задачи для вас.Вариант использования asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED) позволяет вам отслеживать задачи и действовать по их результатам по мере их выполнения ;вы обычно вызываете его несколько раз, пока все ваши задачи не будут завершены.

Из документации asyncio.wait() :

Запуск ожидаемых объектов в aws устанавливается одновременно и блокируется до тех пор, пока условие, заданное return_when .

[...]

return_when , покажет, когда эта функция должна вернуться,Это должна быть одна из следующих констант:

FIRST_COMPLETED
Функция вернется, когда любое будущее закончится или будет отменено.

[...]

В отличие от wait_for(), wait() не отменяет фьючерсы при возникновении тайм-аута.

Документация прямо заявляет, что не будет отменять фьючерсы, даже когда вы устанавливаете тайм-аут (если вы устанавливаететайм-аут, то первый набор done просто пуст, все задачи все еще активны и перечислены во втором pending set).

Если вам нужны незавершенные задачичтобы быть отмененным, сделайте это явно:

while tasks:
    finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    for x in finished:
        result = x.result()

        if result:
            # cancel the other tasks, we have a result. We need to wait for the cancellations
            # to propagate.
            for task in unfinished:
                task.cancel()
            await asyncio.wait(unfinished)
            return result

    tasks = unfinished

Демонстрация с некоторыми дополнительными заданиями на печать и случайным образом:

>>> import asyncio
>>> import random
>>> async def grab_proxy(taskid):
...     await asyncio.sleep(random.uniform(0.1, 1))
...     result = random.choice([None, None, None, 'result'])
...     print(f'Task #{taskid} producing result {result!r}')
...     return result
...
>>> async def task_manager():
...     tasks = [grab_proxy(i) for i in range(10)]
...     while tasks:
...         finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
...         for x in finished:
...             result = x.result()
...             print(f"Finished task produced {result!r}")
...             if result:
...                 # cancel the other tasks, we have a result. We need to wait for the cancellations
...                 # to propagate.
...                 print(f"Cancelling {len(unfinished)} remaining tasks")
...                 for task in unfinished:
...                     task.cancel()
...                 await asyncio.wait(unfinished)
...                 return result
...         tasks = unfinished
...
>>>
>>> def get_proxy_loop():
...     loop = asyncio.new_event_loop()
...     proxy = loop.run_until_complete(task_manager())
...     loop.close()
...     return proxy
...
>>> get_proxy_loop()
Task #7 producing result None
Finished task produced None
Task #0 producing result 'result'
Finished task produced 'result'
Cancelling 8 remaining tasks
'result'
...