Почему цикл событий asyncio иногда завершает задачу, даже когда сталкивается с `RuntimeError`? - PullRequest
0 голосов
/ 07 ноября 2018

Я играл с Python asyncio. Я думаю, что у меня есть разумное понимание к настоящему времени. Но следующее поведение озадачивает меня.

test.py

from threading import Thread
import asyncio

async def wait(t):
    await asyncio.sleep(t)
    print(f'waited {t} sec')

def run(loop):
    loop.run_until_complete(wait(2))

loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()

Этот код неверен. Я знаю, что . Цикл обработки событий не может быть запущен во время работы, и, как правило, он не безопасен для потоков.

Мой вопрос: почему wait(1) иногда все еще может закончить свою работу?

Вот результат двух последовательных прогонов:

>>> py test.py
... Traceback (most recent call last):
...   File "test.py", line 14, in <module>
...     loop.run_until_complete(wait(1))
...   File "C:\Python\Python37\lib\asyncio\base_events.py", line 555, in run_until_complete
...     self.run_forever()
...   File "C:\Python\Python37\lib\asyncio\base_events.py", line 510, in run_forever
... 
...     raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 2 sec

>>> py test.py
... Traceback (most recent call last):
...   File "test.py", line 14, in <module>
...     loop.run_until_complete(wait(1))
...   File "C:\Python\Python37\lib\asyncio\base_events.py", line 555, in run_until_c
... omplete
...     self.run_forever()
...   File "C:\Python\Python37\lib\asyncio\base_events.py", line 510, in run_forever
... 
...     raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 1 sec
... waited 2 sec

Поведение при первом запуске соответствует ожидаемому: основной поток завершается неудачно, но цикл обработки событий по-прежнему запускается wait(2) для завершения в потоке t.

Второй запуск озадачивает, как может wait(1) выполнять свою работу, когда RuntimeError уже брошен? Я предполагаю, что это связано с синхронизацией потоков и не поточно-ориентированной природой цикла событий. Но я не знаю точно, как это работает.

Ответы [ 2 ]

0 голосов
/ 07 ноября 2018

Оооо ... не важно. Я прочитал код asyncio и понял это. Это на самом деле довольно просто.

run_until_complete вызывает ensure_future(future, loop=self) до , проверяет self.is_running() (что делается в run_forever). Поскольку цикл уже запущен, он может выполнить задачу до того, как будет брошен RuntimeError. Конечно, это не всегда происходит из-за состояния гонки.

0 голосов
/ 07 ноября 2018

Исключения выбрасываются на поток . Ошибка времени выполнения возникает в другом потоке из цикла событий. Цикл событий продолжает выполняться независимо.

И wait(1) иногда могут закончить свою работу, потому что вам повезет. Внутренние структуры данных цикла asyncio не защищены от состояния гонки, вызванного использованием потоков (поэтому есть специальные методы поддержки потоков , которые вы должны использовать вместо этого). Но природа условий гонки такова, что это зависит от точного порядка событий, и этот порядок может меняться при каждом запуске вашей программы, в зависимости от того, что еще ваша ОС делает в данный момент.

Метод run_until_complete() first вызывает asyncio.ensure_task(), чтобы добавить сопрограмму в очередь задач с прикрепленным обратным вызовом «done», который снова остановит цикл обработки событий, затем вызовет loop.run_forever(). Когда сопрограмма возвращается, обратный вызов останавливает цикл. Вызов loop.run_forever() выдает здесь RuntimeError.

Когда вы делаете это из потока, задача добавляется в объект deque, присоединенный к циклу, и, если это происходит в нужный момент (например, когда рабочий цикл не занят очисткой очереди), рабочий цикл в основной поток найдет его и выполнит , даже если вызов loop.run_forever() вызвал исключение .

Все это зависит от деталей реализации. Различные версии Python, вероятно, будут демонстрировать различное поведение здесь, и если вы установите альтернативный цикл (например, uvloop), поведение почти наверняка снова будет отличаться.

Если вы хотите запланировать сопрограммы из другого потока, используйте asyncio.run_coroutine_threadsafe(); было бы:

from threading import Thread
import asyncio

async def wait(t):
    print(f'going to wait {t} seconds')
    await asyncio.sleep(t)
    print(f'waited {t} sec')

def run(loop):
    asyncio.run_coroutine_threadsafe(wait(2), loop)

loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()

Вышеприведенное фактически не завершает сопрограмму wait(2), потому что сопрограмма wait(1) запускается с loop.run_until_complete(), поэтому ее обратный вызов снова останавливает цикл, прежде чем закончится 2-секундное ожидание. Но сопрограмма фактически запущена:

going to wait 1 seconds
going to wait 2 seconds
waited 1 sec

но если вы заставите сопрограмму основного потока занять больше времени (например, с помощью wait(3)), то запланированная из потока также будет завершена. Вам придется проделать дополнительную работу, чтобы убедиться, что больше нет ожидающих выполнения задач, запланированных для запуска с циклом, прежде чем вы его выключите.

...